diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-04-12 17:19:33 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-04-12 17:19:33 +0100 |
commit | 6ed406a98be9ed57bdc6930f226aaa9032c77b80 (patch) | |
tree | 0d6a2e8898ea83efc4da57e61fc086347d7be58f | |
parent | 13d5ac8c71cea4835be5e6d42e66d5c6cf36b20f (diff) | |
parent | d28654b6d985242ea3d6fc80c426f1d89dd59b1d (diff) | |
download | rabbitmq-server-6ed406a98be9ed57bdc6930f226aaa9032c77b80.tar.gz |
Merge default into bug25494
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 40 | ||||
-rw-r--r-- | src/rabbit_exchange_decorator.erl | 38 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 16 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 5 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 16 |
6 files changed, 81 insertions, 36 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index eeee799e..4282755d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -40,7 +40,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy}). + scratches, policy, decorators}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9e98448d..c924f53a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -115,23 +115,27 @@ recover() -> rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. -callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> +callback(X = #exchange{type = XType, + decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- registry_lookup(exchange_decorator)], + M <- rabbit_exchange_decorator:select(all, Decorators)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). -policy_changed(X = #exchange{type = XType}, X1) -> +policy_changed(X = #exchange{type = XType, + decorators = Decorators}, X1) -> [ok = M:policy_changed(X, X1) || - M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]], + M <- [type_to_module(XType) | + rabbit_exchange_decorator:select(all, Decorators)]], ok. -serialise_events(X = #exchange{type = Type}) -> - lists:any(fun (M) -> M:serialise_events(X) end, - registry_lookup(exchange_decorator)) +serialise_events(X = #exchange{type = Type, decorators = Decorators}) -> + lists:any(fun (M) -> + M:serialise_events(X) + end, rabbit_exchange_decorator:select(all, Decorators)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -143,16 +147,6 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -registry_lookup(exchange_decorator_route = Class) -> - case get(exchange_decorator_route_modules) of - undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], - put(exchange_decorator_route_modules, Mods), - Mods; - Mods -> Mods - end; -registry_lookup(Class) -> - [M || {_, M} <- rabbit_registry:lookup_all(Class)]. - declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = rabbit_policy:set(#exchange{name = XName, type = Type, @@ -318,15 +312,15 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -route(#exchange{name = #resource{virtual_host = VHost, - name = RName} = XName} = X, +route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, + decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - case {registry_lookup(exchange_decorator_route), RName == <<"">>} of - {[], true} -> + case {RName, rabbit_exchange_decorator:select(route, Decorators)} of + {<<"">>, []} -> %% Optimisation [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - {Decorators, _} -> - lists:usort(route1(Delivery, Decorators, {[X], XName, []})) + {_, SelectedDecorators} -> + lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []})) end. route1(_, _, {[], _, QNames}) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 040b55db..3bc9de1e 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -16,6 +16,10 @@ -module(rabbit_exchange_decorator). +-include("rabbit.hrl"). + +-export([select/2, set/1]). + %% This is like an exchange type except that: %% %% 1) It applies to all exchanges as soon as it is installed, therefore @@ -57,10 +61,13 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% Decorators can optionally implement route/2 which allows additional -%% destinations to be added to the routing decision. -%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> -%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. +%% Allows additional destinations to be added to the routing decision. +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name() | rabbit_exchange:name()] | ok. + +%% Whether the decorator wishes to receive callbacks for the exchange +%% none:no callbacks, noroute:all callbacks except route, all:all callbacks +-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'. -else. @@ -68,8 +75,29 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, + {route, 2}, {active_for, 1}]; behaviour_info(_Other) -> undefined. -endif. + +%%---------------------------------------------------------------------------- + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. + +%% select a subset of active decorators +select(all, {Route, NoRoute}) -> Route ++ NoRoute; +select(route, {Route, _NoRoute}) -> Route. + +set(X) -> + X#exchange{ + decorators = + lists:foldl(fun (D, {Route, NoRoute}) -> + Callbacks = D:active_for(X), + {cons_if_eq(all, Callbacks, D, Route), + cons_if_eq(noroute, Callbacks, D, NoRoute)} + end, {[], []}, list())}. + +cons_if_eq(Select, Select, Item, List) -> [Item | List]; +cons_if_eq(_Select, _Other, _Item, List) -> List. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7398cd2d..d276c2fb 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,7 +46,8 @@ name0(undefined) -> none; name0(Policy) -> pget(name, Policy). set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; -set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. +set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( + X#exchange{policy = set0(Name)}). set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). @@ -169,10 +170,15 @@ update_policies(VHost) -> update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> case match(XName, Policies) of - OldPolicy -> no_change; - NewPolicy -> rabbit_exchange:update( - XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), - {X, X#exchange{policy = NewPolicy}} + OldPolicy -> + no_change; + NewPolicy -> + rabbit_exchange:update( + XName, fun(X1) -> + rabbit_exchange_decorator:set( + X1#exchange{policy = NewPolicy}) + end), + {X, X#exchange{policy = NewPolicy}} end. update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e7b69879..27b588d1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -563,8 +563,9 @@ test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"test_exchange">>}, - X = #exchange{name = XName, type = topic, durable = false, - auto_delete = false, arguments = []}, + X0 = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + X = rabbit_exchange_decorator:set(X0), %% create rabbit_exchange_type_topic:validate(X), exchange_op_callback(X, create, []), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 457b1567..b7b1635b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -43,6 +43,7 @@ -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). +-rabbit_upgrade({exchange_decorators, mnesia, [policy]}). %% ------------------------------------------------------------------- @@ -68,6 +69,7 @@ -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). -spec(gm_pids/0 :: () -> 'ok'). +-spec(exchange_decorators/0 :: () -> 'ok'). -endif. @@ -282,6 +284,20 @@ gm_pids() -> || T <- Tables], ok. +exchange_decorators() -> + ok = exchange_decorators(rabbit_exchange), + ok = exchange_decorators(rabbit_durable_exchange). + +exchange_decorators(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, + Policy}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, Policy, + {[], []}} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). %%-------------------------------------------------------------------- |