summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl293
1 files changed, 208 insertions, 85 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 09ea1e96..33dea8c7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,6 +30,7 @@
%%
-module(rabbit_exchange).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -39,7 +40,7 @@
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -48,6 +49,7 @@
-import(mnesia).
-import(sets).
-import(lists).
+-import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -81,6 +83,8 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -105,13 +109,7 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route),
- %% Tell exchanges to recover themselves only *after* we've
- %% recovered their bindings.
- ok = rabbit_misc:table_foreach(
- fun(Exchange = #exchange{type = Type}) ->
- ok = Type:recover(Exchange)
- end, rabbit_durable_exchange).
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -128,37 +126,22 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange, write);
true -> ok
end,
- ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
end).
-typename_to_plugin_module(T) when is_binary(T) ->
- case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of
- {'EXIT', {badarg, _}} ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- Module ->
- Module
- end.
-
-plugin_module_to_typename(M) when is_atom(M) ->
- "rabbit_exchange_type_" ++ S = atom_to_list(M),
- list_to_binary(S).
-
+check_type(<<"fanout">>) ->
+ fanout;
+check_type(<<"direct">>) ->
+ direct;
+check_type(<<"topic">>) ->
+ topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
- Module = typename_to_plugin_module(T),
- case catch Module:description() of
- {'EXIT', {undef, [{_, description, []} | _]}} ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- {'EXIT', _} ->
- rabbit_misc:protocol_error(
- command_invalid, "problem loading exchange type '~s'", [T]);
- _ ->
- Module
- end.
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]).
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
@@ -166,9 +149,7 @@ assert_type(#exchange{ type = ActualType }, RequiredType)
assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
rabbit_misc:protocol_error(
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name),
- plugin_module_to_typename(ActualType),
- plugin_module_to_typename(RequiredType)]).
+ [rabbit_misc:rs(Name), ActualType, RequiredType]).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -192,7 +173,7 @@ map(VHostPath, F) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
-i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type);
+i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
@@ -209,8 +190,9 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case Type:publish(X, Delivery) of
+publish(X, Seen, Delivery = #delivery{
+ message = #basic_message{routing_key = RK, content = C}}) ->
+ case rabbit_router:deliver(route(X, RK, C), Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -240,6 +222,75 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
+%% return the list of qpids to which a message with a given routing
+%% key, sent to a particular exchange, should be delivered.
+%%
+%% The function ensures that a qpid appears in the return list exactly
+%% as many times as a message should be delivered to it. With the
+%% current exchange types that is at most once.
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
@@ -263,43 +314,21 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- FwdRoute = reverse_route(Route),
- ok = FwdDeleteFun(FwdRoute),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
- FwdRoute#route.binding
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- %% We need the keysort to group the bindings by exchange name, so
- %% that cleanup_deleted_queue_bindings can inform the exchange of
- %% its vanished bindings before maybe_auto_delete'ing the
- %% exchange.
- ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings),
- none, []).
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings)
- when N =:= ExchangeName ->
- cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings),
- cleanup_deleted_queue_bindings(Rest, N, [B]).
-
-cleanup_deleted_queue_bindings1(none, []) ->
- ok;
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}),
- [ok = Type:delete_binding(X, B) || B <- Bindings],
- ok = maybe_auto_delete(X).
+ Exchanges = exchanges_for_queue(QueueName),
+ [begin
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ ok = maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ ok.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -308,6 +337,15 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
+
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -346,25 +384,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- ok = Type:add_binding(X, B)
+ fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
- ok = Type:delete_binding(X, B),
maybe_auto_delete(X)
end
end).
@@ -376,7 +412,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
Fun(X, Q, #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
+ args = sort_arguments(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -434,6 +470,94 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and {add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
delete(ExchangeName, _IfUnused = true) ->
call_with_exchange(ExchangeName, fun conditional_delete/1);
delete(ExchangeName, _IfUnused = false) ->
@@ -455,11 +579,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
+unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- ok = Type:delete(X).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API