summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-11-21 21:24:15 +0000
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-11-21 21:24:15 +0000
commit0378154c75db5884f3720cc632e1d2f460c27f7c (patch)
tree193b0e9537ae67eaae1112c0ab1ba59d528e674e
parentb9e643bb45c32d214125ba06f3c95b97147a3f6c (diff)
downloadrabbitmq-server-0378154c75db5884f3720cc632e1d2f460c27f7c.tar.gz
Make exchanges modular behaviours. Added
rabbit_exchange_behaviour. Split fanout, direct, topic and headers into new modules that implement the new behaviour. Added convention that for exchange type X, a module named rabbit_exchange_type_X must exist.
-rw-r--r--Makefile5
-rw-r--r--src/rabbit_access_control.erl12
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl270
-rw-r--r--src/rabbit_exchange_behaviour.erl19
-rw-r--r--src/rabbit_exchange_type_direct.erl20
-rw-r--r--src/rabbit_exchange_type_fanout.erl20
-rw-r--r--src/rabbit_exchange_type_headers.erl93
-rw-r--r--src/rabbit_exchange_type_topic.erl55
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_router.erl45
-rw-r--r--src/rabbit_tests.erl2
12 files changed, 340 insertions, 208 deletions
diff --git a/Makefile b/Makefile
index 132a7c19..a9845607 100644
--- a/Makefile
+++ b/Makefile
@@ -51,7 +51,10 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
+$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl
+ erlc $(ERLC_OPTS) $<
+
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 6ff7a104..daf6f5af 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -242,12 +242,12 @@ add_vhost(VHostPath) ->
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
{Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
+ [{<<"">>, rabbit_exchange_type_direct},
+ {<<"amq.direct">>, rabbit_exchange_type_direct},
+ {<<"amq.topic">>, rabbit_exchange_type_topic},
+ {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, rabbit_exchange_type_fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b7..297ed5aa 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -42,7 +42,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ rabbit_exchange_type_topic, true, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 33dea8c7..c742f5ca 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,7 +30,6 @@
%%
-module(rabbit_exchange).
--include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -40,7 +39,7 @@
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
+-export([check_type/1, assert_type/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -49,7 +48,6 @@
-import(mnesia).
-import(sets).
-import(lists).
--import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -83,8 +81,6 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -109,7 +105,13 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route).
+ end, rabbit_durable_route),
+ %% Tell exchanges to recover themselves only *after* we've
+ %% recovered their bindings.
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange = #exchange{type = Type}) ->
+ ok = Type:recover(Exchange)
+ end, rabbit_durable_exchange).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -126,22 +128,31 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange, write);
true -> ok
end,
+ ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
end).
-check_type(<<"fanout">>) ->
- fanout;
-check_type(<<"direct">>) ->
- direct;
-check_type(<<"topic">>) ->
- topic;
-check_type(<<"headers">>) ->
- headers;
+typename_to_plugin_module(T) when is_binary(T) ->
+ list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)).
+
+plugin_module_to_typename(M) when is_atom(M) ->
+ "rabbit_exchange_type_" ++ S = atom_to_list(M),
+ list_to_binary(S).
+
check_type(T) ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]).
+ Module = typename_to_plugin_module(T),
+ case catch Module:description() of
+ {'EXIT', {undef, [{_, description, []} | _]}} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]);
+ {'EXIT', _} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "problem loading exchange type '~s'", [T]);
+ _ ->
+ Module
+ end.
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
@@ -149,7 +160,9 @@ assert_type(#exchange{ type = ActualType }, RequiredType)
assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
rabbit_misc:protocol_error(
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name), ActualType, RequiredType]).
+ [rabbit_misc:rs(Name),
+ plugin_module_to_typename(ActualType),
+ plugin_module_to_typename(RequiredType)]).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -173,7 +186,7 @@ map(VHostPath, F) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
-i(type, #exchange{type = Type}) -> Type;
+i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type);
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
@@ -228,68 +241,8 @@ publish(X, Seen, Delivery = #delivery{
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-route(X = #exchange{type = topic}, RoutingKey, _Content) ->
- match_bindings(X, fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end);
-
-route(X = #exchange{type = headers}, _RoutingKey, Content) ->
- Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
- match_bindings(X, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end);
-
-route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
- match_routing_key(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey, _Content) ->
- match_routing_key(X, RoutingKey).
-
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(#exchange{name = Name}, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
-
-match_routing_key(#exchange{name = Name}, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], sets:from_list(Queues)).
+route(X = #exchange{type = Type}, RoutingKey, Content) ->
+ Type:route(X, RoutingKey, Content).
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
@@ -314,21 +267,36 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
- [begin
- ok = FwdDeleteFun(reverse_route(Route)),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
- ok.
+ DeletedBindings =
+ [begin
+ FwdRoute = reverse_route(Route),
+ ok = FwdDeleteFun(FwdRoute),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
+ FwdRoute#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings),
+ none, []).
+
+cleanup_deleted_queue_bindings([], ExchangeName, Bindings) ->
+ cleanup_deleted_queue_bindings1(ExchangeName, Bindings);
+cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings)
+ when N =:= ExchangeName ->
+ cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]);
+cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) ->
+ cleanup_deleted_queue_bindings1(ExchangeName, Bindings),
+ cleanup_deleted_queue_bindings(Rest, N, [B]).
+
+cleanup_deleted_queue_bindings1(none, []) ->
+ ok;
+cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
+ [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}),
+ [ok = Type:delete_binding(X, B) || B <- Bindings],
+ ok = maybe_auto_delete(X).
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -337,15 +305,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -384,23 +343,25 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
+ fun (X = #exchange{type = Type}, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
+ fun mnesia:write/3),
+ ok = Type:add_binding(X, B)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
+ fun (X = #exchange{type = Type}, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
+ ok = Type:delete_binding(X, B),
maybe_auto_delete(X)
end
end).
@@ -412,7 +373,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
Fun(X, Q, #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = sort_arguments(Arguments)})
+ args = rabbit_misc:sort_arguments(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -470,94 +431,6 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
-default_headers_match_kind() -> all.
-
-parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
-
-%% Horrendous matching algorithm. Depends for its merge-like
-%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and {add,delete}_binding/4 do.
-%%
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
-
-headers_match([], _Data, AllMatch, _AnyMatch, all) ->
- AllMatch;
-headers_match([], _Data, _AllMatch, AnyMatch, any) ->
- AnyMatch;
-headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
- AllMatch, AnyMatch, MatchKind) ->
- headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
-headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
- headers_match([], [], false, AnyMatch, MatchKind);
-headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK > DK ->
- headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
-headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
- _AllMatch, AnyMatch, MatchKind) when PK < DK ->
- headers_match(PRest, Data, false, AnyMatch, MatchKind);
-headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK == DK ->
- {AllMatch1, AnyMatch1} =
- if
- %% It's not properly specified, but a "no value" in a
- %% pattern field is supposed to mean simple presence of
- %% the corresponding data field. I've interpreted that to
- %% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
- end,
- headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-
-split_topic_key(Key) ->
- {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
- KeySplit.
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-
delete(ExchangeName, _IfUnused = true) ->
call_with_exchange(ExchangeName, fun conditional_delete/1);
delete(ExchangeName, _IfUnused = false) ->
@@ -579,10 +452,11 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(#exchange{name = ExchangeName}) ->
+unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ ok = Type:delete(X).
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
new file mode 100644
index 00000000..0ca1e95f
--- /dev/null
+++ b/src/rabbit_exchange_behaviour.erl
@@ -0,0 +1,19 @@
+-module(rabbit_exchange_behaviour).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% Called *outside* mnesia transactions.
+ {description,0},
+ {route,3},
+
+ %% Called *inside* mnesia transactions, must be idempotent.
+ {recover,1}, %% like init, but called on server startup for durable exchanges
+ {init,1}, %% like recover, but called on declaration when previously absent
+ {delete,1}, %% called on deletion
+ {add_binding, 2},
+ {delete_binding, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
new file mode 100644
index 00000000..b2de8f2c
--- /dev/null
+++ b/src/rabbit_exchange_type_direct.erl
@@ -0,0 +1,20 @@
+-module(rabbit_exchange_type_direct).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, route/3]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+
+description() ->
+ [{name, <<"direct">>},
+ {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+
+route(#exchange{name = Name}, RoutingKey, _Content) ->
+ rabbit_router:match_routing_key(Name, RoutingKey).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
new file mode 100644
index 00000000..68a779e6
--- /dev/null
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -0,0 +1,20 @@
+-module(rabbit_exchange_type_fanout).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, route/3]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+
+description() ->
+ [{name, <<"fanout">>},
+ {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+
+route(#exchange{name = Name}, _RoutingKey, _Content) ->
+ rabbit_router:match_routing_key(Name, '_').
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
new file mode 100644
index 00000000..e539aca4
--- /dev/null
+++ b/src/rabbit_exchange_type_headers.erl
@@ -0,0 +1,93 @@
+-module(rabbit_exchange_type_headers).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, route/3]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+
+-ifdef(use_specs).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"headers">>},
+ {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+
+route(#exchange{name = Name}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> rabbit_misc:sort_arguments(H)
+ end,
+ rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end).
+
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort
+%% (rabbit_misc:sort_arguments) that route/3 and
+%% rabbit_exchange:{add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
new file mode 100644
index 00000000..341fbaca
--- /dev/null
+++ b/src/rabbit_exchange_type_topic.erl
@@ -0,0 +1,55 @@
+-module(rabbit_exchange_type_topic).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_behaviour).
+
+-export([description/0, route/3]).
+-export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+
+-export([topic_matches/2]).
+
+-ifdef(use_specs).
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-endif.
+
+description() ->
+ [{name, <<"topic">>},
+ {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+
+route(#exchange{name = Name}, RoutingKey, _Content) ->
+ rabbit_router:match_bindings(Name, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
+recover(_X) -> ok.
+init(_X) -> ok.
+delete(_X) -> ok.
+add_binding(_X, _B) -> ok.
+delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21764fce..ef3e0309 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1]).
+-export([sort_arguments/1]).
-import(mnesia).
-import(lists).
@@ -489,3 +490,7 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
+
+%% Sorts a list of AMQP table fields as per the AMQP spec
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 10f80cc3..afaf9d45 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -30,12 +30,15 @@
%%
-module(rabbit_router).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2]).
+ deliver/2,
+ match_bindings/2,
+ match_routing_key/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -129,6 +132,46 @@ deliver_per_node(NodeQPids, Delivery) ->
-endif.
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(Name, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(Name, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%%--------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ba048184..99f1bc67 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -302,7 +302,7 @@ test_topic_match(P, R) ->
test_topic_match(P, R, true).
test_topic_match(P, R, Expected) ->
- case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of
+ case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), list_to_binary(R)) of
Expected ->
passed;
_ ->