summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-11-21 23:23:35 +0000
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-11-21 23:23:35 +0000
commit07c8651009d557e8f0e499c5d123768a345de35d (patch)
treee66c200d7f2f951d0a05288d1fd1668b9b0b8855
parent0378154c75db5884f3720cc632e1d2f460c27f7c (diff)
downloadrabbitmq-server-07c8651009d557e8f0e499c5d123768a345de35d.tar.gz
Change to publish (= route-and-deliver) model for exchanges.
This lets exchanges do arbitrary delivery inspection for routing, arbitrary content rewriting or discarding, and arbitrary content and delivery synthesis.
-rw-r--r--src/rabbit_exchange.erl14
-rw-r--r--src/rabbit_exchange_behaviour.erl10
-rw-r--r--src/rabbit_exchange_type_direct.erl7
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl12
-rw-r--r--src/rabbit_exchange_type_topic.erl13
6 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c742f5ca..9e0fca78 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -203,9 +203,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X, Seen, Delivery = #delivery{
- message = #basic_message{routing_key = RK, content = C}}) ->
- case rabbit_router:deliver(route(X, RK, C), Delivery) of
+publish(X = #exchange{type = Type}, Seen, Delivery) ->
+ case Type:publish(X, Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -235,15 +234,6 @@ publish(X, Seen, Delivery = #delivery{
R
end.
-%% return the list of qpids to which a message with a given routing
-%% key, sent to a particular exchange, should be delivered.
-%%
-%% The function ensures that a qpid appears in the return list exactly
-%% as many times as a message should be delivered to it. With the
-%% current exchange types that is at most once.
-route(X = #exchange{type = Type}, RoutingKey, Content) ->
- Type:route(X, RoutingKey, Content).
-
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
index 0ca1e95f..3681124c 100644
--- a/src/rabbit_exchange_behaviour.erl
+++ b/src/rabbit_exchange_behaviour.erl
@@ -5,13 +5,13 @@
behaviour_info(callbacks) ->
[
%% Called *outside* mnesia transactions.
- {description,0},
- {route,3},
+ {description, 0},
+ {publish, 2},
%% Called *inside* mnesia transactions, must be idempotent.
- {recover,1}, %% like init, but called on server startup for durable exchanges
- {init,1}, %% like recover, but called on declaration when previously absent
- {delete,1}, %% called on deletion
+ {recover, 1}, %% like init, but called on server startup for durable exchanges
+ {init, 1}, %% like recover, but called on declaration when previously absent
+ {delete, 1}, %% called on deletion
{add_binding, 2},
{delete_binding, 2}
];
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index b2de8f2c..2eacf8cf 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -3,15 +3,16 @@
-behaviour(rabbit_exchange_behaviour).
--export([description/0, route/3]).
+-export([description/0, publish/2]).
-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name}, RoutingKey, _Content) ->
- rabbit_router:match_routing_key(Name, RoutingKey).
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
recover(_X) -> ok.
init(_X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 68a779e6..782deda7 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -3,15 +3,15 @@
-behaviour(rabbit_exchange_behaviour).
--export([description/0, route/3]).
+-export([description/0, publish/2]).
-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name}, _RoutingKey, _Content) ->
- rabbit_router:match_routing_key(Name, '_').
+publish(#exchange{name = Name}, Delivery) ->
+ rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
recover(_X) -> ok.
init(_X) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index e539aca4..00f7ea0f 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -4,7 +4,7 @@
-behaviour(rabbit_exchange_behaviour).
--export([description/0, route/3]).
+-export([description/0, publish/2]).
-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
-ifdef(use_specs).
@@ -15,14 +15,16 @@ description() ->
[{name, <<"headers">>},
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name}, _RoutingKey, Content) ->
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
undefined -> [];
H -> rabbit_misc:sort_arguments(H)
end,
- rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end).
+ rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end),
+ Delivery).
default_headers_match_kind() -> all.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 341fbaca..3461a736 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -3,7 +3,7 @@
-behaviour(rabbit_exchange_behaviour).
--export([description/0, route/3]).
+-export([description/0, publish/2]).
-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
-export([topic_matches/2]).
@@ -16,10 +16,13 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name}, RoutingKey, _Content) ->
- rabbit_router:match_bindings(Name, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
+publish(#exchange{name = Name},
+ Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
+ rabbit_router:deliver(rabbit_router:match_bindings(Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end),
+ Delivery).
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),