diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-11-21 23:23:35 +0000 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-11-21 23:23:35 +0000 |
commit | 07c8651009d557e8f0e499c5d123768a345de35d (patch) | |
tree | e66c200d7f2f951d0a05288d1fd1668b9b0b8855 | |
parent | 0378154c75db5884f3720cc632e1d2f460c27f7c (diff) | |
download | rabbitmq-server-07c8651009d557e8f0e499c5d123768a345de35d.tar.gz |
Change to publish (= route-and-deliver) model for exchanges.
This lets exchanges do arbitrary delivery inspection for routing,
arbitrary content rewriting or discarding, and arbitrary content
and delivery synthesis.
-rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
-rw-r--r-- | src/rabbit_exchange_behaviour.erl | 10 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 7 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 12 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 13 |
6 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c742f5ca..9e0fca78 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -203,9 +203,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X, Seen, Delivery = #delivery{ - message = #basic_message{routing_key = RK, content = C}}) -> - case rabbit_router:deliver(route(X, RK, C), Delivery) of +publish(X = #exchange{type = Type}, Seen, Delivery) -> + case Type:publish(X, Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -235,15 +234,6 @@ publish(X, Seen, Delivery = #delivery{ R end. -%% return the list of qpids to which a message with a given routing -%% key, sent to a particular exchange, should be delivered. -%% -%% The function ensures that a qpid appears in the return list exactly -%% as many times as a message should be delivered to it. With the -%% current exchange types that is at most once. -route(X = #exchange{type = Type}, RoutingKey, Content) -> - Type:route(X, RoutingKey, Content). - %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl index 0ca1e95f..3681124c 100644 --- a/src/rabbit_exchange_behaviour.erl +++ b/src/rabbit_exchange_behaviour.erl @@ -5,13 +5,13 @@ behaviour_info(callbacks) -> [ %% Called *outside* mnesia transactions. - {description,0}, - {route,3}, + {description, 0}, + {publish, 2}, %% Called *inside* mnesia transactions, must be idempotent. - {recover,1}, %% like init, but called on server startup for durable exchanges - {init,1}, %% like recover, but called on declaration when previously absent - {delete,1}, %% called on deletion + {recover, 1}, %% like init, but called on server startup for durable exchanges + {init, 1}, %% like recover, but called on declaration when previously absent + {delete, 1}, %% called on deletion {add_binding, 2}, {delete_binding, 2} ]; diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index b2de8f2c..2eacf8cf 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -3,15 +3,16 @@ -behaviour(rabbit_exchange_behaviour). --export([description/0, route/3]). +-export([description/0, publish/2]). -export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, RoutingKey, _Content) -> - rabbit_router:match_routing_key(Name, RoutingKey). +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). recover(_X) -> ok. init(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 68a779e6..782deda7 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -3,15 +3,15 @@ -behaviour(rabbit_exchange_behaviour). --export([description/0, route/3]). +-export([description/0, publish/2]). -export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, _RoutingKey, _Content) -> - rabbit_router:match_routing_key(Name, '_'). +publish(#exchange{name = Name}, Delivery) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). recover(_X) -> ok. init(_X) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index e539aca4..00f7ea0f 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -4,7 +4,7 @@ -behaviour(rabbit_exchange_behaviour). --export([description/0, route/3]). +-export([description/0, publish/2]). -export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). -ifdef(use_specs). @@ -15,14 +15,16 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, _RoutingKey, Content) -> +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> rabbit_misc:sort_arguments(H) end, - rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end). + rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end), + Delivery). default_headers_match_kind() -> all. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 341fbaca..3461a736 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -3,7 +3,7 @@ -behaviour(rabbit_exchange_behaviour). --export([description/0, route/3]). +-export([description/0, publish/2]). -export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). -export([topic_matches/2]). @@ -16,10 +16,13 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, RoutingKey, _Content) -> - rabbit_router:match_bindings(Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_bindings(Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end), + Delivery). split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), |