diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-04 15:11:45 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-04 15:11:45 +0000 |
commit | 27e2c7185e950c4b2557d9fdcc4969db25c33663 (patch) | |
tree | 52133afaf6bb638ef184df0b553510efa648e1cc /src | |
parent | bedfa1bd114804bf77661efa25f571ee05f4a4ae (diff) | |
parent | 4dbb5487c40b0a9bd6aefdb85a9ec34fba162bde (diff) | |
download | rabbitmq-server-27e2c7185e950c4b2557d9fdcc4969db25c33663.tar.gz |
merging bug 22169 into default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 58 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 531 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 61 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 63 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 61 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 137 | ||||
-rw-r--r-- | src/rabbit_exchange_type_registry.erl | 129 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 101 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 28 | ||||
-rw-r--r-- | src/rabbit_router.erl | 45 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 3 |
13 files changed, 890 insertions, 335 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 35d3ce4a..6084be1b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -53,6 +53,12 @@ [{mfa, {rabbit_mnesia, init, []}}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_exchange_type_registry, + [{description, "exchange type registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_exchange_type_registry]}}, + {enables, kernel_ready}]}). + -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_child, [rabbit_log]}}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 31787466..ceec00fd 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -342,33 +342,41 @@ flush_all(QPids, ChPid) -> QPids). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> - ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - ok - end - end). + case + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName) + end + end) of + Err = {error, _} -> Err; + PostHook -> + PostHook(), + ok + end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:fold( - fun (QueueName, Acc) -> - ok = rabbit_exchange:delete_transient_queue_bindings( - QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - Acc - end, - ok, - qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end). + [Hook() || + Hook <- rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)], + ok. + +delete_queue(QueueName) -> + Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Post. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 41085fb7..3597fcd7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1,4 +1,4 @@ -%% The contents of this file are subject to the Mozilla Public Licenses +%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 832acd16..1cfba00e 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -30,7 +30,6 @@ %% -module(rabbit_exchange). --include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -40,7 +39,7 @@ -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). +-export([check_type/1, assert_type/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -49,7 +48,6 @@ -import(mnesia). -import(sets). -import(lists). --import(qlc). -import(regexp). %%---------------------------------------------------------------------------- @@ -82,10 +80,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -100,17 +96,37 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - ok = rabbit_misc:table_foreach( - fun(Exchange) -> ok = mnesia:write(rabbit_exchange, - Exchange, write) - end, rabbit_durable_exchange), - ok = rabbit_misc:table_foreach( - fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write) - end, rabbit_durable_route). + Exs = rabbit_misc:table_fold( + fun(Exchange, Acc) -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + [Exchange | Acc] + end, [], rabbit_durable_exchange), + Bs = rabbit_misc:table_fold( + fun(Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route), + recover_with_bindings(Bs, Exs), + ok. + +recover_with_bindings(Bs, Exs) -> + recover_with_bindings( + lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#exchange.name, Exs), []). + +recover_with_bindings([B = #binding{exchange_name = Name} | Rest], + Xs = [#exchange{name = Name} | _], + Bindings) -> + recover_with_bindings(Rest, Xs, [B | Bindings]); +recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> + (type_to_module(Type)):recover(X, Bindings), + recover_with_bindings(Bs, Xs, []); +recover_with_bindings([], [], []) -> + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -118,31 +134,53 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of - [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), - if Durable -> - ok = mnesia:write(rabbit_durable_exchange, - Exchange, write); - true -> ok - end, - Exchange; - [ExistingX] -> ExistingX - end - end). + %% We want to upset things if it isn't ok; this is different from + %% the other hooks invocations, where we tend to ignore the return + %% value. + TypeModule = type_to_module(Type), + ok = TypeModule:validate(Exchange), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = case Durable of + true -> + mnesia:write(rabbit_durable_exchange, + Exchange, write); + false -> + ok + end, + {new, Exchange}; + [ExistingX] -> + {existing, ExistingX} + end + end) of + {new, X} -> TypeModule:create(X), + X; + {existing, X} -> X; + Err -> Err + end. -check_type(<<"fanout">>) -> - fanout; -check_type(<<"direct">>) -> - direct; -check_type(<<"topic">>) -> - topic; -check_type(<<"headers">>) -> - headers; -check_type(T) -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]). +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + case rabbit_exchange_type_registry:lookup_module(T) of + {ok, Module} -> Module; + {error, not_found} -> rabbit_misc:protocol_error( + command_invalid, + "invalid exchange type '~s'", [T]) + end. + +%% Used with binaries sent over the wire; the type may not exist. +check_type(TypeBin) -> + case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown exchange type '~s'", [TypeBin]); + T -> + _Module = type_to_module(T), + T + end. assert_type(#exchange{ type = ActualType }, RequiredType) when ActualType == RequiredType -> @@ -157,7 +195,7 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of - {ok, X} -> X; + {ok, X} -> X; {error, not_found} -> rabbit_misc:not_found(Name) end. @@ -193,9 +231,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X, Seen, Delivery = #delivery{ - message = #basic_message{routing_key = RK, content = C}}) -> - case rabbit_router:deliver(route(X, RK, C), Delivery) of +publish(X = #exchange{type = Type}, Seen, Delivery) -> + case (type_to_module(Type)):publish(X, Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -205,95 +242,24 @@ publish(X, Seen, Delivery = #delivery{ AName -> NewSeen = [XName | Seen], case lists:member(AName, NewSeen) of - true -> - R; - false -> - case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end + true -> R; + false -> case lookup(AName) of + {ok, AX} -> + publish(AX, NewSeen, Delivery); + {error, not_found} -> + rabbit_log:warning( + "alternate exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(AName)]), + R + end end end; R -> R end. -%% return the list of qpids to which a message with a given routing -%% key, sent to a particular exchange, should be delivered. -%% -%% The function ensures that a qpid appears in the return list exactly -%% as many times as a message should be delivered to it. With the -%% current exchange types that is at most once. -route(X = #exchange{type = topic}, RoutingKey, _Content) -> - match_bindings(X, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end); - -route(X = #exchange{type = headers}, _RoutingKey, Content) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, - match_bindings(X, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - match_routing_key(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - match_routing_key(X, RoutingKey). - -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - -%% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(#exchange{name = Name}, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). - -match_routing_key(#exchange{name = Name}, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], sets:from_list(Queues)). - %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? @@ -302,13 +268,13 @@ delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), - ok = delete_forward_routes(Route) + ok = delete_forward_routes(Route), + Route#route.binding end || Route <- mnesia:match_object( rabbit_route, #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - write)], - ok. + write)]. delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -317,21 +283,55 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), - [begin - ok = FwdDeleteFun(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write) - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], - ok. + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Cleanup = cleanup_deleted_queue_bindings( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + no_delete -> Module:remove_bindings(X, Bs) + end + end, Cleanup) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% cleanup_deleted_queue_bindings1) works properly. +cleanup_deleted_queue_bindings([], Acc) -> + Acc; +cleanup_deleted_queue_bindings( + [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). + +cleanup_deleted_queue_bindings( + ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], + Bindings, Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); +cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> + %% either Deleted is [], or its head has a non-matching ExchangeName + NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], + cleanup_deleted_queue_bindings(Deleted, NewAcc). + +cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + {maybe_auto_delete(X), Bindings}. + delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -340,15 +340,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -385,37 +376,61 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + case mnesia:read(rabbit_route, B) of + [] -> + sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} + end + end + end) of + {new, Exchange = #exchange{ type = Type }, Binding} -> + (type_to_module(Type)):add_binding(Exchange, Binding); + {existing, _, _} -> + ok; + Err = {error, _} -> + Err + end. delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B} + end + end) of + Err = {error, _} -> + Err; + {{Action, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case Action of + auto_delete -> Module:delete(X, [B]); + no_delete -> Module:remove_bindings(X, [B]) + end + end. binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}) + Fun(X, Q, #binding{ + exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = rabbit_misc:sort_field_table(Arguments)}) end). sync_binding(Binding, Durable, Fun) -> @@ -440,8 +455,8 @@ list_bindings(VHostPath) -> rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + _ = '_'}, + _ = '_'})]. route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); @@ -456,136 +471,60 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; + queue_name = Queue, + key = Key, + args = Args}; reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and {add,delete}_binding/4 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - -split_topic_key(Key) -> - {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), - KeySplit. - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). - -delete(ExchangeName, _IfUnused = true) -> - call_with_exchange(ExchangeName, fun conditional_delete/1); -delete(ExchangeName, _IfUnused = false) -> - call_with_exchange(ExchangeName, fun unconditional_delete/1). - -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; + queue_name = Queue, + key = Key, + args = Args}. + +delete(ExchangeName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + case call_with_exchange(ExchangeName, Fun) of + {deleted, X = #exchange{type = Type}, Bs} -> + (type_to_module(Type)):delete(X, Bs), + ok; + Error = {error, _InUseOrNotFound} -> + Error + end. + +maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> + {no_delete, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. + case conditional_delete(Exchange) of + {error, in_use} -> {no_delete, Exchange}; + {deleted, Exchange, []} -> {auto_deleted, Exchange} + end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of + case contains(rabbit_route, Match) orelse + contains(rabbit_durable_route, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. -unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_exchange_bindings(ExchangeName), +unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> + Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}). + ok = mnesia:delete({rabbit_exchange, ExchangeName}), + {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl new file mode 100644 index 00000000..a8c071e6 --- /dev/null +++ b/src/rabbit_exchange_type.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {description, 0}, + {publish, 2}, + + %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} + {validate, 1}, + + %% called after declaration when previously absent + {create, 1}, + + %% called when recovering + {recover, 2}, + + %% called after exchange deletion. + {delete, 2}, + + %% called after a binding has been added + {add_binding, 2}, + + %% called after bindings have been deleted. + {remove_bindings, 2} + + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl new file mode 100644 index 00000000..9b71e0e1 --- /dev/null +++ b/src/rabbit_exchange_type_direct.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_direct). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type direct"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"direct">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"direct">>}, + {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), + Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl new file mode 100644 index 00000000..311654ab --- /dev/null +++ b/src/rabbit_exchange_type_fanout.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_fanout). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type fanout"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"fanout">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"fanout">>}, + {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl new file mode 100644 index 00000000..285dab1a --- /dev/null +++ b/src/rabbit_exchange_type_headers.erl @@ -0,0 +1,137 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_headers). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type headers"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"headers">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-ifdef(use_specs). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-endif. + +description() -> + [{name, <<"headers">>}, + {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{content = Content}}) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> rabbit_misc:sort_field_table(H) + end, + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end), + Delivery). + +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort +%% (rabbit_misc:sort_field_table) that route/3 and +%% rabbit_exchange:{add,delete}_binding/4 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl new file mode 100644 index 00000000..175d15ad --- /dev/null +++ b/src/rabbit_exchange_type_registry.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_registry). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/2, binary_to_type/1, lookup_module/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}). +-spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}). +-spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}). + +-endif. + +%%--------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +register(TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, TypeName, ModuleName}). + +%% This is used with user-supplied arguments (e.g., on exchange +%% declare), so we restrict it to existing atoms only. This means it +%% can throw a badarg, indicating that the type cannot have been +%% registered. +binary_to_type(TypeBin) when is_binary(TypeBin) -> + case catch list_to_existing_atom(binary_to_list(TypeBin)) of + {'EXIT', {badarg, _}} -> {error, not_found}; + TypeAtom -> TypeAtom + end. + +lookup_module(T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, T) of + [{_, Module}] -> + {ok, Module}; + [] -> + {error, not_found} + end. + +%%--------------------------------------------------------------------------- + +internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> + list_to_atom(binary_to_list(TypeBin)). + +internal_register(TypeName, ModuleName) + when is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(ModuleName), + true = ets:insert(?ETS_NAME, + {internal_binary_to_type(TypeName), ModuleName}), + ok. + +sanity_check_module(Module) -> + case catch lists:member(rabbit_exchange_type, + lists:flatten( + [Bs || {Attr, Bs} <- + Module:module_info(attributes), + Attr =:= behavior orelse + Attr =:= behaviour])) of + {'EXIT', {undef, _}} -> {error, not_module}; + false -> {error, not_exchange_type}; + true -> ok + end. + +%%--------------------------------------------------------------------------- + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), + {ok, none}. + +handle_call({register, TypeName, ModuleName}, _From, State) -> + ok = internal_register(TypeName, ModuleName), + {reply, ok, State}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl new file mode 100644 index 00000000..8a3dceea --- /dev/null +++ b/src/rabbit_exchange_type_topic.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_topic). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type topic"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"topic">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-export([topic_matches/2]). + +-ifdef(use_specs). +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-endif. + +description() -> + [{name, <<"topic">>}, + {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end), + Delivery). + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) + when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or + last_topic_match(P, [BacktrackNext | R], BacktrackList). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0654f58a..9abc1695 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -49,13 +49,14 @@ -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). --export([table_foreach/2]). +-export([table_fold/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). +-export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). @@ -114,7 +115,7 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). +-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -129,6 +130,7 @@ -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). @@ -357,20 +359,20 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% For each entry in a table, execute a function in a transaction. -%% This is often far more efficient than wrapping a tx around the lot. +%% Fold over each entry in a table, executing the cons function in a +%% transaction. This is often far more efficient than wrapping a tx +%% around the lot. %% %% We ignore entries that have been modified or removed. -table_foreach(F, TableName) -> - lists:foreach( - fun (E) -> execute_mnesia_transaction( +table_fold(F, Acc0, TableName) -> + lists:foldl( + fun (E, Acc) -> execute_mnesia_transaction( fun () -> case mnesia:match_object(TableName, E, read) of - [] -> ok; - _ -> F(E) + [] -> Acc; + _ -> F(E, Acc) end end) - end, dirty_read_all(TableName)), - ok. + end, Acc0, dirty_read_all(TableName)). dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -504,6 +506,10 @@ queue_fold(Fun, Init, Q) -> {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. +%% Sorts a list of AMQP table fields as per the AMQP spec +sort_field_table(Arguments) -> + lists:keysort(1, Arguments). + %% This provides a string representation of a pid that is the same %% regardless of what node we are running on. The representation also %% permits easy identification of the pid's node. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ee2b82c5..884ea4ab 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -30,12 +30,15 @@ %% -module(rabbit_router). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/0, - deliver/2]). + deliver/2, + match_bindings/2, + match_routing_key/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -129,6 +132,46 @@ deliver_per_node(NodeQPids, Delivery) -> -endif. +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(Name, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + rabbit_route, + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + Match(Binding)] + end). + +match_routing_key(Name, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], sets:from_list(Queues)). + %%-------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 51d95c35..82f2d199 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -330,7 +330,8 @@ test_topic_match(P, R) -> test_topic_match(P, R, true). test_topic_match(P, R, Expected) -> - case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of + case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), + list_to_binary(R)) of Expected -> passed; _ -> |