summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-12-18 16:55:53 +0000
committerMatthias Radestock <matthias@lshift.net>2009-12-18 16:55:53 +0000
commit84302310125ca9d71326cfea1297114f83b99ca2 (patch)
treef7528b25b07e021defd351da6b01edb1ef01a52d
parent976004c8a6fb2863a9861858da59fde72b7fbe00 (diff)
downloadrabbitmq-server-84302310125ca9d71326cfea1297114f83b99ca2.tar.gz
Backed out changeset 443c7b2d7e1
re-introducing pluggable exchange types on a branch
-rw-r--r--Makefile5
-rw-r--r--include/rabbit_exchange_behaviour_spec.hrl41
-rw-r--r--include/rabbit_framing_spec.hrl2
-rw-r--r--src/rabbit_access_control.erl12
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl293
-rw-r--r--src/rabbit_exchange_behaviour.erl50
-rw-r--r--src/rabbit_exchange_type_direct.erl53
-rw-r--r--src/rabbit_exchange_type_fanout.erl52
-rw-r--r--src/rabbit_exchange_type_headers.erl127
-rw-r--r--src/rabbit_exchange_type_topic.erl90
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_router.erl45
-rw-r--r--src/rabbit_tests.erl2
15 files changed, 561 insertions, 220 deletions
diff --git a/Makefile b/Makefile
index db8f7001..1cd48df2 100644
--- a/Makefile
+++ b/Makefile
@@ -64,7 +64,10 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
+$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl
+ erlc $(ERLC_OPTS) $<
+
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl
new file mode 100644
index 00000000..30662af8
--- /dev/null
+++ b/include/rabbit_exchange_behaviour_spec.hrl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
+-spec(recover/1 :: (exchange()) -> 'ok').
+-spec(init/1 :: (exchange()) -> 'ok').
+-spec(delete/1 :: (exchange()) -> 'ok').
+-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
+-spec(delete_binding/2 :: (exchange(), binding()) -> 'ok').
+
+-endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index a78c2301..16af8ad3 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -56,5 +56,5 @@
-type(password() :: binary()).
-type(vhost() :: binary()).
-type(ctag() :: binary()).
--type(exchange_type() :: 'direct' | 'topic' | 'fanout').
+-type(exchange_type() :: atom()).
-type(binding_key() :: binary()).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 6ff7a104..daf6f5af 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -242,12 +242,12 @@ add_vhost(VHostPath) ->
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
{Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
+ [{<<"">>, rabbit_exchange_type_direct},
+ {<<"amq.direct">>, rabbit_exchange_type_direct},
+ {<<"amq.topic">>, rabbit_exchange_type_topic},
+ {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, rabbit_exchange_type_fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c20cb16c..507dab48 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1,4 +1,4 @@
-%% The contents of this file are subject to the Mozilla Public Licenses
+%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b7..297ed5aa 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -42,7 +42,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ rabbit_exchange_type_topic, true, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 33dea8c7..09ea1e96 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,7 +30,6 @@
%%
-module(rabbit_exchange).
--include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -40,7 +39,7 @@
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
+-export([check_type/1, assert_type/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -49,7 +48,6 @@
-import(mnesia).
-import(sets).
-import(lists).
--import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -83,8 +81,6 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -109,7 +105,13 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route).
+ end, rabbit_durable_route),
+ %% Tell exchanges to recover themselves only *after* we've
+ %% recovered their bindings.
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange = #exchange{type = Type}) ->
+ ok = Type:recover(Exchange)
+ end, rabbit_durable_exchange).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -126,22 +128,37 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange, write);
true -> ok
end,
+ ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
end).
-check_type(<<"fanout">>) ->
- fanout;
-check_type(<<"direct">>) ->
- direct;
-check_type(<<"topic">>) ->
- topic;
-check_type(<<"headers">>) ->
- headers;
+typename_to_plugin_module(T) when is_binary(T) ->
+ case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of
+ {'EXIT', {badarg, _}} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]);
+ Module ->
+ Module
+ end.
+
+plugin_module_to_typename(M) when is_atom(M) ->
+ "rabbit_exchange_type_" ++ S = atom_to_list(M),
+ list_to_binary(S).
+
check_type(T) ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]).
+ Module = typename_to_plugin_module(T),
+ case catch Module:description() of
+ {'EXIT', {undef, [{_, description, []} | _]}} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]);
+ {'EXIT', _} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "problem loading exchange type '~s'", [T]);
+ _ ->
+ Module
+ end.
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
@@ -149,7 +166,9 @@ assert_type(#exchange{ type = ActualType }, RequiredType)
assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
rabbit_misc:protocol_error(
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name), ActualType, RequiredType]).
+ [rabbit_misc:rs(Name),
+ plugin_module_to_typename(ActualType),
+ plugin_module_to_typename(RequiredType)]).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -173,7 +192,7 @@ map(VHostPath, F) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
-i(type, #exchange{type = Type}) -> Type;
+i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type);
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
@@ -190,9 +209,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X, Seen, Delivery = #delivery{
- message = #basic_message{routing_key = RK, content = C}}) ->
- case rabbit_router:deliver(route(X, RK, C), Delivery) of
+publish(X = #exchange{type = Type}, Seen, Delivery) ->
+ case Type:publish(X, Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -222,75 +240,6 @@ publish(X, Seen, Delivery = #delivery{
R
end.
-%% return the list of qpids to which a message with a given routing
-%% key, sent to a particular exchange, should be delivered.
-%%
-%% The function ensures that a qpid appears in the return list exactly
-%% as many times as a message should be delivered to it. With the
-%% current exchange types that is at most once.
-route(X = #exchange{type = topic}, RoutingKey, _Content) ->
- match_bindings(X, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end);
-
-route(X = #exchange{type = headers}, _RoutingKey, Content) ->
- Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
- match_bindings(X, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end);
-
-route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
- match_routing_key(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey, _Content) ->
- match_routing_key(X, RoutingKey).
-
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(#exchange{name = Name}, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
-
-match_routing_key(#exchange{name = Name}, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], sets:from_list(Queues)).
-
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
@@ -314,21 +263,43 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
- [begin
- ok = FwdDeleteFun(reverse_route(Route)),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
- ok.
+ DeletedBindings =
+ [begin
+ FwdRoute = reverse_route(Route),
+ ok = FwdDeleteFun(FwdRoute),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
+ FwdRoute#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ %% We need the keysort to group the bindings by exchange name, so
+ %% that cleanup_deleted_queue_bindings can inform the exchange of
+ %% its vanished bindings before maybe_auto_delete'ing the
+ %% exchange.
+ ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings),
+ none, []).
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% cleanup_deleted_queue_bindings1) works properly.
+cleanup_deleted_queue_bindings([], ExchangeName, Bindings) ->
+ cleanup_deleted_queue_bindings1(ExchangeName, Bindings);
+cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings)
+ when N =:= ExchangeName ->
+ cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]);
+cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) ->
+ cleanup_deleted_queue_bindings1(ExchangeName, Bindings),
+ cleanup_deleted_queue_bindings(Rest, N, [B]).
+
+cleanup_deleted_queue_bindings1(none, []) ->
+ ok;
+cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
+ [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}),
+ [ok = Type:delete_binding(X, B) || B <- Bindings],
+ ok = maybe_auto_delete(X).
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -337,15 +308,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -384,23 +346,25 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
+ fun (X = #exchange{type = Type}, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
+ fun mnesia:write/3),
+ ok = Type:add_binding(X, B)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
+ fun (X = #exchange{type = Type}, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
+ ok = Type:delete_binding(X, B),
maybe_auto_delete(X)
end
end).
@@ -412,7 +376,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
Fun(X, Q, #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = sort_arguments(Arguments)})
+ args = rabbit_misc:sort_field_table(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -470,94 +434,6 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
-default_headers_match_kind() -> all.
-
-parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
-
-%% Horrendous matching algorithm. Depends for its merge-like
-%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and {add,delete}_binding/4 do.
-%%
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
-
-headers_match([], _Data, AllMatch, _AnyMatch, all) ->
- AllMatch;
-headers_match([], _Data, _AllMatch, AnyMatch, any) ->
- AnyMatch;
-headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
- AllMatch, AnyMatch, MatchKind) ->
- headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
-headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
- headers_match([], [], false, AnyMatch, MatchKind);
-headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK > DK ->
- headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
-headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
- _AllMatch, AnyMatch, MatchKind) when PK < DK ->
- headers_match(PRest, Data, false, AnyMatch, MatchKind);
-headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK == DK ->
- {AllMatch1, AnyMatch1} =
- if
- %% It's not properly specified, but a "no value" in a
- %% pattern field is supposed to mean simple presence of
- %% the corresponding data field. I've interpreted that to
- %% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
- end,
- headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-
-split_topic_key(Key) ->
- {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
- KeySplit.
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-
delete(ExchangeName, _IfUnused = true) ->
call_with_exchange(ExchangeName, fun conditional_delete/1);
delete(ExchangeName, _IfUnused = false) ->
@@ -579,10 +455,11 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(#exchange{name = ExchangeName}) ->
+unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ ok = Type:delete(X).
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
new file mode 100644
index 00000000..7935df6b
--- /dev/null
+++ b/src/rabbit_exchange_behaviour.erl
@@ -0,0 +1,50 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_behaviour).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% Called *outside* mnesia transactions.
+ {description, 0},
+ {publish, 2},
+
+ %% Called *inside* mnesia transactions, must be idempotent.
+ {recover, 1}, %% like init, but called on server startup for durable exchanges
+ {init, 1}, %% like recover, but called on declaration when previously absent
+ {delete, 1}, %% called on deletion
+ {add_binding, 2},
+ {delete_binding, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
new file mode 100644
index 00000000..e6e6ae99
--- /dev/null
+++ b/src/rabbit_exchange_type_direct.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_direct).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, publish/2]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-include("rabbit_exchange_behaviour_spec.hrl").
+
+description() ->
+ [{name, <<"direct">>},
+ {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
new file mode 100644
index 00000000..2194abd4
--- /dev/null
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -0,0 +1,52 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_fanout).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, publish/2]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-include("rabbit_exchange_behaviour_spec.hrl").
+
+description() ->
+ [{name, <<"fanout">>},
+ {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name}, Delivery) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
new file mode 100644
index 00000000..72c85b06
--- /dev/null
+++ b/src/rabbit_exchange_type_headers.erl
@@ -0,0 +1,127 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_headers).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, publish/2]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-include("rabbit_exchange_behaviour_spec.hrl").
+
+-ifdef(use_specs).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"headers">>},
+ {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{content = Content}}) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> rabbit_misc:sort_field_table(H)
+ end,
+ rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end),
+ Delivery).
+
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort
+%% (rabbit_misc:sort_field_table) that route/3 and
+%% rabbit_exchange:{add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
new file mode 100644
index 00000000..738ff595
--- /dev/null
+++ b/src/rabbit_exchange_type_topic.erl
@@ -0,0 +1,90 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_type_topic).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, publish/2]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-include("rabbit_exchange_behaviour_spec.hrl").
+
+-export([topic_matches/2]).
+
+-ifdef(use_specs).
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"topic">>},
+ {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_bindings(Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end),
+ Delivery).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21764fce..2a0015bf 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1]).
+-export([sort_field_table/1]).
-import(mnesia).
-import(lists).
@@ -489,3 +490,7 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
+
+%% Sorts a list of AMQP table fields as per the AMQP spec
+sort_field_table(Arguments) ->
+ lists:keysort(1, Arguments).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 10f80cc3..afaf9d45 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -30,12 +30,15 @@
%%
-module(rabbit_router).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2]).
+ deliver/2,
+ match_bindings/2,
+ match_routing_key/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -129,6 +132,46 @@ deliver_per_node(NodeQPids, Delivery) ->
-endif.
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(Name, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(Name, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%%--------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ba048184..99f1bc67 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -302,7 +302,7 @@ test_topic_match(P, R) ->
test_topic_match(P, R, true).
test_topic_match(P, R, Expected) ->
- case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of
+ case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), list_to_binary(R)) of
Expected ->
passed;
_ ->