summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile5
-rw-r--r--include/rabbit_exchange_behaviour_spec.hrl41
-rw-r--r--include/rabbit_framing_spec.hrl2
-rw-r--r--src/rabbit_access_control.erl12
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl293
-rw-r--r--src/rabbit_exchange_behaviour.erl50
-rw-r--r--src/rabbit_exchange_type_direct.erl53
-rw-r--r--src/rabbit_exchange_type_fanout.erl52
-rw-r--r--src/rabbit_exchange_type_headers.erl127
-rw-r--r--src/rabbit_exchange_type_topic.erl90
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_router.erl45
-rw-r--r--src/rabbit_tests.erl2
15 files changed, 220 insertions, 561 deletions
diff --git a/Makefile b/Makefile
index 1cd48df2..db8f7001 100644
--- a/Makefile
+++ b/Makefile
@@ -64,10 +64,7 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl
- erlc $(ERLC_OPTS) $<
-
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl
deleted file mode 100644
index 30662af8..00000000
--- a/include/rabbit_exchange_behaviour_spec.hrl
+++ /dev/null
@@ -1,41 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
--ifdef(use_specs).
-
--spec(description/0 :: () -> [{atom(), any()}]).
--spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(recover/1 :: (exchange()) -> 'ok').
--spec(init/1 :: (exchange()) -> 'ok').
--spec(delete/1 :: (exchange()) -> 'ok').
--spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
--spec(delete_binding/2 :: (exchange(), binding()) -> 'ok').
-
--endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index 16af8ad3..a78c2301 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -56,5 +56,5 @@
-type(password() :: binary()).
-type(vhost() :: binary()).
-type(ctag() :: binary()).
--type(exchange_type() :: atom()).
+-type(exchange_type() :: 'direct' | 'topic' | 'fanout').
-type(binding_key() :: binary()).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index daf6f5af..6ff7a104 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -242,12 +242,12 @@ add_vhost(VHostPath) ->
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
{Name,Type} <-
- [{<<"">>, rabbit_exchange_type_direct},
- {<<"amq.direct">>, rabbit_exchange_type_direct},
- {<<"amq.topic">>, rabbit_exchange_type_topic},
- {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, rabbit_exchange_type_fanout}]],
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 507dab48..c20cb16c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1,4 +1,4 @@
-%% The contents of this file are subject to the Mozilla Public License
+%% The contents of this file are subject to the Mozilla Public Licenses
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 297ed5aa..b28574b7 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -42,7 +42,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- rabbit_exchange_type_topic, true, false, []),
+ topic, true, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 09ea1e96..33dea8c7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,6 +30,7 @@
%%
-module(rabbit_exchange).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -39,7 +40,7 @@
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -48,6 +49,7 @@
-import(mnesia).
-import(sets).
-import(lists).
+-import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -81,6 +83,8 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -105,13 +109,7 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route),
- %% Tell exchanges to recover themselves only *after* we've
- %% recovered their bindings.
- ok = rabbit_misc:table_foreach(
- fun(Exchange = #exchange{type = Type}) ->
- ok = Type:recover(Exchange)
- end, rabbit_durable_exchange).
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -128,37 +126,22 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange, write);
true -> ok
end,
- ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
end).
-typename_to_plugin_module(T) when is_binary(T) ->
- case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of
- {'EXIT', {badarg, _}} ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- Module ->
- Module
- end.
-
-plugin_module_to_typename(M) when is_atom(M) ->
- "rabbit_exchange_type_" ++ S = atom_to_list(M),
- list_to_binary(S).
-
+check_type(<<"fanout">>) ->
+ fanout;
+check_type(<<"direct">>) ->
+ direct;
+check_type(<<"topic">>) ->
+ topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
- Module = typename_to_plugin_module(T),
- case catch Module:description() of
- {'EXIT', {undef, [{_, description, []} | _]}} ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- {'EXIT', _} ->
- rabbit_misc:protocol_error(
- command_invalid, "problem loading exchange type '~s'", [T]);
- _ ->
- Module
- end.
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]).
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
@@ -166,9 +149,7 @@ assert_type(#exchange{ type = ActualType }, RequiredType)
assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
rabbit_misc:protocol_error(
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name),
- plugin_module_to_typename(ActualType),
- plugin_module_to_typename(RequiredType)]).
+ [rabbit_misc:rs(Name), ActualType, RequiredType]).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -192,7 +173,7 @@ map(VHostPath, F) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
-i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type);
+i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
@@ -209,8 +190,9 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case Type:publish(X, Delivery) of
+publish(X, Seen, Delivery = #delivery{
+ message = #basic_message{routing_key = RK, content = C}}) ->
+ case rabbit_router:deliver(route(X, RK, C), Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -240,6 +222,75 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
+%% return the list of qpids to which a message with a given routing
+%% key, sent to a particular exchange, should be delivered.
+%%
+%% The function ensures that a qpid appears in the return list exactly
+%% as many times as a message should be delivered to it. With the
+%% current exchange types that is at most once.
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
@@ -263,43 +314,21 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- FwdRoute = reverse_route(Route),
- ok = FwdDeleteFun(FwdRoute),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
- FwdRoute#route.binding
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- %% We need the keysort to group the bindings by exchange name, so
- %% that cleanup_deleted_queue_bindings can inform the exchange of
- %% its vanished bindings before maybe_auto_delete'ing the
- %% exchange.
- ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings),
- none, []).
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings)
- when N =:= ExchangeName ->
- cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings),
- cleanup_deleted_queue_bindings(Rest, N, [B]).
-
-cleanup_deleted_queue_bindings1(none, []) ->
- ok;
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}),
- [ok = Type:delete_binding(X, B) || B <- Bindings],
- ok = maybe_auto_delete(X).
+ Exchanges = exchanges_for_queue(QueueName),
+ [begin
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ ok = maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ ok.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -308,6 +337,15 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
+
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -346,25 +384,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- ok = Type:add_binding(X, B)
+ fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
- ok = Type:delete_binding(X, B),
maybe_auto_delete(X)
end
end).
@@ -376,7 +412,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
Fun(X, Q, #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
+ args = sort_arguments(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -434,6 +470,94 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and {add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
delete(ExchangeName, _IfUnused = true) ->
call_with_exchange(ExchangeName, fun conditional_delete/1);
delete(ExchangeName, _IfUnused = false) ->
@@ -455,11 +579,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
+unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- ok = Type:delete(X).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
deleted file mode 100644
index 7935df6b..00000000
--- a/src/rabbit_exchange_behaviour.erl
+++ /dev/null
@@ -1,50 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_behaviour).
-
--export([behaviour_info/1]).
-
-behaviour_info(callbacks) ->
- [
- %% Called *outside* mnesia transactions.
- {description, 0},
- {publish, 2},
-
- %% Called *inside* mnesia transactions, must be idempotent.
- {recover, 1}, %% like init, but called on server startup for durable exchanges
- {init, 1}, %% like recover, but called on declaration when previously absent
- {delete, 1}, %% called on deletion
- {add_binding, 2},
- {delete_binding, 2}
- ];
-behaviour_info(_Other) ->
- undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
deleted file mode 100644
index e6e6ae99..00000000
--- a/src/rabbit_exchange_type_direct.erl
+++ /dev/null
@@ -1,53 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_direct).
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
-description() ->
- [{name, <<"direct">>},
- {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
deleted file mode 100644
index 2194abd4..00000000
--- a/src/rabbit_exchange_type_fanout.erl
+++ /dev/null
@@ -1,52 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_fanout).
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
-description() ->
- [{name, <<"fanout">>},
- {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name}, Delivery) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
deleted file mode 100644
index 72c85b06..00000000
--- a/src/rabbit_exchange_type_headers.erl
+++ /dev/null
@@ -1,127 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_headers).
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
--ifdef(use_specs).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
--endif.
-
-description() ->
- [{name, <<"headers">>},
- {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{content = Content}}) ->
- Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> rabbit_misc:sort_field_table(H)
- end,
- rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end),
- Delivery).
-
-default_headers_match_kind() -> all.
-
-parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
-
-%% Horrendous matching algorithm. Depends for its merge-like
-%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that route/3 and
-%% rabbit_exchange:{add,delete}_binding/4 do.
-%%
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
-
-headers_match([], _Data, AllMatch, _AnyMatch, all) ->
- AllMatch;
-headers_match([], _Data, _AllMatch, AnyMatch, any) ->
- AnyMatch;
-headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
- AllMatch, AnyMatch, MatchKind) ->
- headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
-headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
- headers_match([], [], false, AnyMatch, MatchKind);
-headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK > DK ->
- headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
-headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
- _AllMatch, AnyMatch, MatchKind) when PK < DK ->
- headers_match(PRest, Data, false, AnyMatch, MatchKind);
-headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK == DK ->
- {AllMatch1, AnyMatch1} =
- if
- %% It's not properly specified, but a "no value" in a
- %% pattern field is supposed to mean simple presence of
- %% the corresponding data field. I've interpreted that to
- %% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
- end,
- headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
deleted file mode 100644
index 738ff595..00000000
--- a/src/rabbit_exchange_type_topic.erl
+++ /dev/null
@@ -1,90 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_topic).
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
--export([topic_matches/2]).
-
--ifdef(use_specs).
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
--endif.
-
-description() ->
- [{name, <<"topic">>},
- {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end),
- Delivery).
-
-split_topic_key(Key) ->
- {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
- KeySplit.
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2a0015bf..21764fce 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,7 +56,6 @@
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1]).
--export([sort_field_table/1]).
-import(mnesia).
-import(lists).
@@ -490,7 +489,3 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
-
-%% Sorts a list of AMQP table fields as per the AMQP spec
-sort_field_table(Arguments) ->
- lists:keysort(1, Arguments).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index afaf9d45..10f80cc3 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -30,15 +30,12 @@
%%
-module(rabbit_router).
--include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2,
- match_bindings/2,
- match_routing_key/2]).
+ deliver/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -132,46 +129,6 @@ deliver_per_node(NodeQPids, Delivery) ->
-endif.
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(Name, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
-
-match_routing_key(Name, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], sets:from_list(Queues)).
-
%%--------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 99f1bc67..ba048184 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -302,7 +302,7 @@ test_topic_match(P, R) ->
test_topic_match(P, R, true).
test_topic_match(P, R, Expected) ->
- case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), list_to_binary(R)) of
+ case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of
Expected ->
passed;
_ ->