diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-12-18 14:23:25 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-12-18 14:23:25 +0000 |
commit | 8231a842eac1e6e02ddad536a9eef625deb90d19 (patch) | |
tree | 652e1040de41de687f42e18f48c644d866887f5d | |
parent | d06ebb6bd833b4c1c37f2f5d52daa2a6149ceefb (diff) | |
download | rabbitmq-server-8231a842eac1e6e02ddad536a9eef625deb90d19.tar.gz |
Backed out changeset 71e93e17450c
pluggable exchange types are not ready for prime time yet
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | include/rabbit_exchange_behaviour_spec.hrl | 41 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 12 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 293 | ||||
-rw-r--r-- | src/rabbit_exchange_behaviour.erl | 50 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 53 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 52 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 127 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 90 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 5 | ||||
-rw-r--r-- | src/rabbit_router.erl | 45 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
15 files changed, 220 insertions, 561 deletions
@@ -51,10 +51,7 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< -$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl - erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl deleted file mode 100644 index 30662af8..00000000 --- a/include/rabbit_exchange_behaviour_spec.hrl +++ /dev/null @@ -1,41 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% --ifdef(use_specs). - --spec(description/0 :: () -> [{atom(), any()}]). --spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(recover/1 :: (exchange()) -> 'ok'). --spec(init/1 :: (exchange()) -> 'ok'). --spec(delete/1 :: (exchange()) -> 'ok'). --spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). --spec(delete_binding/2 :: (exchange(), binding()) -> 'ok'). - --endif. diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index 16af8ad3..a78c2301 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -56,5 +56,5 @@ -type(password() :: binary()). -type(vhost() :: binary()). -type(ctag() :: binary()). --type(exchange_type() :: atom()). +-type(exchange_type() :: 'direct' | 'topic' | 'fanout'). -type(binding_key() :: binary()). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index daf6f5af..6ff7a104 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -242,12 +242,12 @@ add_vhost(VHostPath) -> rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || {Name,Type} <- - [{<<"">>, rabbit_exchange_type_direct}, - {<<"amq.direct">>, rabbit_exchange_type_direct}, - {<<"amq.topic">>, rabbit_exchange_type_topic}, - {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, rabbit_exchange_type_fanout}]], + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], ok; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 507dab48..c20cb16c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1,4 +1,4 @@ -%% The contents of this file are subject to the Mozilla Public License +%% The contents of this file are subject to the Mozilla Public Licenses %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 297ed5aa..b28574b7 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -42,7 +42,7 @@ init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - rabbit_exchange_type_topic, true, false, []), + topic, true, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 09ea1e96..33dea8c7 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -30,6 +30,7 @@ %% -module(rabbit_exchange). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -39,7 +40,7 @@ -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -48,6 +49,7 @@ -import(mnesia). -import(sets). -import(lists). +-import(qlc). -import(regexp). %%---------------------------------------------------------------------------- @@ -81,6 +83,8 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -105,13 +109,7 @@ recover() -> Route, write), ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write) - end, rabbit_durable_route), - %% Tell exchanges to recover themselves only *after* we've - %% recovered their bindings. - ok = rabbit_misc:table_foreach( - fun(Exchange = #exchange{type = Type}) -> - ok = Type:recover(Exchange) - end, rabbit_durable_exchange). + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -128,37 +126,22 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange, write); true -> ok end, - ok = Type:init(Exchange), Exchange; [ExistingX] -> ExistingX end end). -typename_to_plugin_module(T) when is_binary(T) -> - case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of - {'EXIT', {badarg, _}} -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]); - Module -> - Module - end. - -plugin_module_to_typename(M) when is_atom(M) -> - "rabbit_exchange_type_" ++ S = atom_to_list(M), - list_to_binary(S). - +check_type(<<"fanout">>) -> + fanout; +check_type(<<"direct">>) -> + direct; +check_type(<<"topic">>) -> + topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> - Module = typename_to_plugin_module(T), - case catch Module:description() of - {'EXIT', {undef, [{_, description, []} | _]}} -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]); - {'EXIT', _} -> - rabbit_misc:protocol_error( - command_invalid, "problem loading exchange type '~s'", [T]); - _ -> - Module - end. + rabbit_misc:protocol_error( + command_invalid, "invalid exchange type '~s'", [T]). assert_type(#exchange{ type = ActualType }, RequiredType) when ActualType == RequiredType -> @@ -166,9 +149,7 @@ assert_type(#exchange{ type = ActualType }, RequiredType) assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> rabbit_misc:protocol_error( not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", - [rabbit_misc:rs(Name), - plugin_module_to_typename(ActualType), - plugin_module_to_typename(RequiredType)]). + [rabbit_misc:rs(Name), ActualType, RequiredType]). lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). @@ -192,7 +173,7 @@ map(VHostPath, F) -> infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, #exchange{name = Name}) -> Name; -i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type); +i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(arguments, #exchange{arguments = Arguments}) -> Arguments; @@ -209,8 +190,9 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case Type:publish(X, Delivery) of +publish(X, Seen, Delivery = #delivery{ + message = #basic_message{routing_key = RK, content = C}}) -> + case rabbit_router:deliver(route(X, RK, C), Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -240,6 +222,75 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. +%% return the list of qpids to which a message with a given routing +%% key, sent to a particular exchange, should be delivered. +%% +%% The function ensures that a qpid appears in the return list exactly +%% as many times as a message should be delivered to it. With the +%% current exchange types that is at most once. +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + rabbit_route, + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + Match(Binding)] + end). + +match_routing_key(#exchange{name = Name}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], sets:from_list(Queues)). + %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? @@ -263,43 +314,21 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - FwdRoute = reverse_route(Route), - ok = FwdDeleteFun(FwdRoute), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write), - FwdRoute#route.binding - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - %% We need the keysort to group the bindings by exchange name, so - %% that cleanup_deleted_queue_bindings can inform the exchange of - %% its vanished bindings before maybe_auto_delete'ing the - %% exchange. - ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings), - none, []). - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) - when N =:= ExchangeName -> - cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings), - cleanup_deleted_queue_bindings(Rest, N, [B]). - -cleanup_deleted_queue_bindings1(none, []) -> - ok; -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}), - [ok = Type:delete_binding(X, B) || B <- Bindings], - ok = maybe_auto_delete(X). + Exchanges = exchanges_for_queue(QueueName), + [begin + ok = FwdDeleteFun(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -308,6 +337,15 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -346,25 +384,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - ok = Type:add_binding(X, B) + fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; _ -> ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), - ok = Type:delete_binding(X, B), maybe_auto_delete(X) end end). @@ -376,7 +412,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> Fun(X, Q, #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) + args = sort_arguments(Arguments)}) end). sync_binding(Binding, Durable, Fun) -> @@ -434,6 +470,94 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and {add,delete}_binding/4 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). + delete(ExchangeName, _IfUnused = true) -> call_with_exchange(ExchangeName, fun conditional_delete/1); delete(ExchangeName, _IfUnused = false) -> @@ -455,11 +579,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) -> +unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - ok = Type:delete(X). + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl deleted file mode 100644 index 7935df6b..00000000 --- a/src/rabbit_exchange_behaviour.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_behaviour). - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [ - %% Called *outside* mnesia transactions. - {description, 0}, - {publish, 2}, - - %% Called *inside* mnesia transactions, must be idempotent. - {recover, 1}, %% like init, but called on server startup for durable exchanges - {init, 1}, %% like recover, but called on declaration when previously absent - {delete, 1}, %% called on deletion - {add_binding, 2}, - {delete_binding, 2} - ]; -behaviour_info(_Other) -> - undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl deleted file mode 100644 index e6e6ae99..00000000 --- a/src/rabbit_exchange_type_direct.erl +++ /dev/null @@ -1,53 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_direct). --include("rabbit.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - -description() -> - [{name, <<"direct">>}, - {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl deleted file mode 100644 index 2194abd4..00000000 --- a/src/rabbit_exchange_type_fanout.erl +++ /dev/null @@ -1,52 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_fanout). --include("rabbit.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - -description() -> - [{name, <<"fanout">>}, - {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, Delivery) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl deleted file mode 100644 index 72c85b06..00000000 --- a/src/rabbit_exchange_type_headers.erl +++ /dev/null @@ -1,127 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_headers). --include("rabbit.hrl"). --include("rabbit_framing.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - --ifdef(use_specs). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). --endif. - -description() -> - [{name, <<"headers">>}, - {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{content = Content}}) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> rabbit_misc:sort_field_table(H) - end, - rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end), - Delivery). - -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that route/3 and -%% rabbit_exchange:{add,delete}_binding/4 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl deleted file mode 100644 index 738ff595..00000000 --- a/src/rabbit_exchange_type_topic.erl +++ /dev/null @@ -1,90 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_topic). --include("rabbit.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - --export([topic_matches/2]). - --ifdef(use_specs). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). --endif. - -description() -> - [{name, <<"topic">>}, - {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). - -split_topic_key(Key) -> - {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), - KeySplit. - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2a0015bf..21764fce 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,7 +56,6 @@ -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1]). --export([sort_field_table/1]). -import(mnesia). -import(lists). @@ -490,7 +489,3 @@ ceil(N) -> 0 -> N; _ -> 1 + T end. - -%% Sorts a list of AMQP table fields as per the AMQP spec -sort_field_table(Arguments) -> - lists:keysort(1, Arguments). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index afaf9d45..10f80cc3 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -30,15 +30,12 @@ %% -module(rabbit_router). --include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/0, - deliver/2, - match_bindings/2, - match_routing_key/2]). + deliver/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -132,46 +129,6 @@ deliver_per_node(NodeQPids, Delivery) -> -endif. -%% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], sets:from_list(Queues)). - %%-------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 99f1bc67..ba048184 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -302,7 +302,7 @@ test_topic_match(P, R) -> test_topic_match(P, R, true). test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), list_to_binary(R)) of + case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of Expected -> passed; _ -> |