summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange_type_topic.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange_type_topic.erl')
-rw-r--r--src/rabbit_exchange_type_topic.erl282
1 files changed, 236 insertions, 46 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 9cbf8100..348655b1 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -15,12 +15,13 @@
%%
-module(rabbit_exchange_type_topic).
+
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -31,58 +32,247 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
--export([topic_matches/2]).
-
--ifdef(use_specs).
-
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
-
--endif.
+%%----------------------------------------------------------------------------
description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
+serialise_events() -> false.
-split_topic_key(Key) ->
- string:tokens(binary_to_list(Key), ".").
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest])
- when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or
- last_topic_match(P, [BacktrackNext | R], BacktrackList).
+%% NB: This may return duplicate results in some situations (that's ok)
+route(#exchange{name = X},
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([begin
+ Words = split_topic_key(RKey),
+ mnesia:async_dirty(fun trie_match/2, [X, Words])
+ end || RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
-delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
-remove_bindings(_Tx, _X, _Bs) -> ok.
+
+delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_edges(X),
+ trie_remove_all_bindings(X),
+ ok;
+delete(none, _Exchange, _Bs) ->
+ ok.
+
+add_binding(transaction, _Exchange, Binding) ->
+ internal_add_binding(Binding);
+add_binding(none, _Exchange, _Binding) ->
+ ok.
+
+remove_bindings(transaction, #exchange{name = X}, Bs) ->
+ %% The remove process is split into two distinct phases. In the
+ %% first phase we gather the lists of bindings and edges to
+ %% delete, then in the second phase we process all the
+ %% deletions. This is to prevent interleaving of read/write
+ %% operations in mnesia that can adversely affect performance.
+ {ToDelete, Paths} =
+ lists:foldl(
+ fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(S, split_topic_key(K)),
+ {[{FinalNode, D} | Acc],
+ decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
+ end, {[], gb_trees:empty()}, Bs),
+
+ [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
+ [trie_remove_edge(X, Parent, Node, W) ||
+ {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+ ok;
+remove_bindings(none, _X, _Bs) ->
+ ok.
+
+maybe_add_path(_X, [{root, none}], PathAcc) ->
+ PathAcc;
+maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
+ case gb_trees:is_defined(Node, PathAcc) of
+ true -> PathAcc;
+ false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
+ trie_child_count(X, Node)}},
+ PathAcc)
+ end.
+
+decrement_bindings(X, Path, PathAcc) ->
+ with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
+ Path, PathAcc).
+
+decrement_edges(X, Path, PathAcc) ->
+ with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
+ Path, PathAcc).
+
+with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
+ PathAcc;
+with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
+ {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
+ NewCounts = Fun(Counts),
+ NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
+ case NewCounts of
+ {0, 0} -> decrement_edges(X, ParentPath,
+ maybe_add_path(X, ParentPath, NewPathAcc));
+ _ -> NewPathAcc
+ end.
+
+
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
+
+%%----------------------------------------------------------------------------
+
+internal_add_binding(#binding{source = X, key = K, destination = D}) ->
+ FinalNode = follow_down_create(X, split_topic_key(K)),
+ trie_add_binding(X, FinalNode, D),
+ ok.
+
+trie_match(X, Words) ->
+ trie_match(X, root, Words, []).
+
+trie_match(X, Node, [], ResAcc) ->
+ trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [],
+ trie_bindings(X, Node) ++ ResAcc);
+trie_match(X, Node, [W | RestW] = Words, ResAcc) ->
+ lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
+ trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc)
+ end, ResAcc, [{W, fun trie_match/4, RestW},
+ {"*", fun trie_match/4, RestW},
+ {"#", fun trie_match_skip_any/4, Words}]).
+
+trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) ->
+ case trie_child(X, Node, Search) of
+ {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
+ error -> ResAcc
+ end.
+
+trie_match_skip_any(X, Node, [], ResAcc) ->
+ trie_match(X, Node, [], ResAcc);
+trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) ->
+ trie_match_skip_any(X, Node, RestW,
+ trie_match(X, Node, Words, ResAcc)).
+
+follow_down_create(X, Words) ->
+ case follow_down_last_node(X, Words) of
+ {ok, FinalNode} -> FinalNode;
+ {error, Node, RestW} -> lists:foldl(
+ fun (W, CurNode) ->
+ NewNode = new_node_id(),
+ trie_add_edge(X, CurNode, NewNode, W),
+ NewNode
+ end, Node, RestW)
+ end.
+
+follow_down_last_node(X, Words) ->
+ follow_down(X, fun (_, Node, _) -> Node end, root, Words).
+
+follow_down_get_path(X, Words) ->
+ {ok, Path} =
+ follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end,
+ [{root, none}], Words),
+ Path.
+
+follow_down(X, AccFun, Acc0, Words) ->
+ follow_down(X, root, AccFun, Acc0, Words).
+
+follow_down(_X, _CurNode, _AccFun, Acc, []) ->
+ {ok, Acc};
+follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
+ case trie_child(X, CurNode, W) of
+ {ok, NextNode} -> follow_down(X, NextNode, AccFun,
+ AccFun(W, NextNode, Acc), RestW);
+ error -> {error, Acc, Words}
+ end.
+
+trie_child(X, Node, Word) ->
+ case mnesia:read({rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}}) of
+ [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
+ [] -> error
+ end.
+
+trie_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = '$1'}},
+ mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+
+trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
+
+trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
+
+trie_edge_op(X, FromNode, ToNode, W, Op) ->
+ ok = Op(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = FromNode,
+ word = W},
+ node_id = ToNode},
+ write).
+
+trie_add_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:write/3).
+
+trie_remove_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+
+trie_binding_op(X, Node, D, Op) ->
+ ok = Op(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
+ write).
+
+trie_child_count(X, Node) ->
+ count(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+trie_binding_count(X, Node) ->
+ count(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+count(Table, Match) ->
+ length(mnesia:match_object(Table, Match, read)).
+
+trie_remove_all_edges(X) ->
+ remove_all(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_bindings(X) ->
+ remove_all(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X, _ = '_'},
+ _ = '_'}).
+
+remove_all(Table, Pattern) ->
+ lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
+ mnesia:match_object(Table, Pattern, write)).
+
+new_node_id() ->
+ rabbit_guid:guid().
+
+split_topic_key(Key) ->
+ split_topic_key(Key, [], []).
+
+split_topic_key(<<>>, [], []) ->
+ [];
+split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
+ lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
+