summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange_type_topic.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange_type_topic.erl')
-rw-r--r--src/rabbit_exchange_type_topic.erl270
1 files changed, 0 insertions, 270 deletions
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
deleted file mode 100644
index af00fe88..00000000
--- a/src/rabbit_exchange_type_topic.erl
+++ /dev/null
@@ -1,270 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_exchange_type_topic).
-
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_type).
-
--export([description/0, serialise_events/0, route/2]).
--export([validate/1, validate_binding/2,
- create/2, delete/3, policy_changed/2, add_binding/3,
- remove_bindings/3, assert_args_equivalence/2]).
-
--rabbit_boot_step({?MODULE,
- [{description, "exchange type topic"},
- {mfa, {rabbit_registry, register,
- [exchange, <<"topic">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, kernel_ready}]}).
-
-%%----------------------------------------------------------------------------
-
-description() ->
- [{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-
-serialise_events() -> false.
-
-%% NB: This may return duplicate results in some situations (that's ok)
-route(#exchange{name = X},
- #delivery{message = #basic_message{routing_keys = Routes}}) ->
- lists:append([begin
- Words = split_topic_key(RKey),
- mnesia:async_dirty(fun trie_match/2, [X, Words])
- end || RKey <- Routes]).
-
-validate(_X) -> ok.
-validate_binding(_X, _B) -> ok.
-create(_Tx, _X) -> ok.
-
-delete(transaction, #exchange{name = X}, _Bs) ->
- trie_remove_all_nodes(X),
- trie_remove_all_edges(X),
- trie_remove_all_bindings(X),
- ok;
-delete(none, _Exchange, _Bs) ->
- ok.
-
-policy_changed(_X1, _X2) -> ok.
-
-add_binding(transaction, _Exchange, Binding) ->
- internal_add_binding(Binding);
-add_binding(none, _Exchange, _Binding) ->
- ok.
-
-remove_bindings(transaction, _X, Bs) ->
- %% See rabbit_binding:lock_route_tables for the rationale for
- %% taking table locks.
- case Bs of
- [_] -> ok;
- _ -> [mnesia:lock({table, T}, write) ||
- T <- [rabbit_topic_trie_node,
- rabbit_topic_trie_edge,
- rabbit_topic_trie_binding]]
- end,
- [begin
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(X, split_topic_key(K)),
- trie_remove_binding(X, FinalNode, D, Args),
- remove_path_if_empty(X, Path)
- end || #binding{source = X, key = K, destination = D, args = Args} <- Bs],
- ok;
-remove_bindings(none, _X, _Bs) ->
- ok.
-
-assert_args_equivalence(X, Args) ->
- rabbit_exchange:assert_args_equivalence(X, Args).
-
-%%----------------------------------------------------------------------------
-
-internal_add_binding(#binding{source = X, key = K, destination = D,
- args = Args}) ->
- FinalNode = follow_down_create(X, split_topic_key(K)),
- trie_add_binding(X, FinalNode, D, Args),
- ok.
-
-trie_match(X, Words) ->
- trie_match(X, root, Words, []).
-
-trie_match(X, Node, [], ResAcc) ->
- trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [],
- trie_bindings(X, Node) ++ ResAcc);
-trie_match(X, Node, [W | RestW] = Words, ResAcc) ->
- lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
- trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc)
- end, ResAcc, [{W, fun trie_match/4, RestW},
- {"*", fun trie_match/4, RestW},
- {"#", fun trie_match_skip_any/4, Words}]).
-
-trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) ->
- case trie_child(X, Node, Search) of
- {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
- error -> ResAcc
- end.
-
-trie_match_skip_any(X, Node, [], ResAcc) ->
- trie_match(X, Node, [], ResAcc);
-trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) ->
- trie_match_skip_any(X, Node, RestW,
- trie_match(X, Node, Words, ResAcc)).
-
-follow_down_create(X, Words) ->
- case follow_down_last_node(X, Words) of
- {ok, FinalNode} -> FinalNode;
- {error, Node, RestW} -> lists:foldl(
- fun (W, CurNode) ->
- NewNode = new_node_id(),
- trie_add_edge(X, CurNode, NewNode, W),
- NewNode
- end, Node, RestW)
- end.
-
-follow_down_last_node(X, Words) ->
- follow_down(X, fun (_, Node, _) -> Node end, root, Words).
-
-follow_down_get_path(X, Words) ->
- {ok, Path} =
- follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end,
- [{root, none}], Words),
- Path.
-
-follow_down(X, AccFun, Acc0, Words) ->
- follow_down(X, root, AccFun, Acc0, Words).
-
-follow_down(_X, _CurNode, _AccFun, Acc, []) ->
- {ok, Acc};
-follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
- case trie_child(X, CurNode, W) of
- {ok, NextNode} -> follow_down(X, NextNode, AccFun,
- AccFun(W, NextNode, Acc), RestW);
- error -> {error, Acc, Words}
- end.
-
-remove_path_if_empty(_, [{root, none}]) ->
- ok;
-remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
- case mnesia:read(rabbit_topic_trie_node,
- #trie_node{exchange_name = X, node_id = Node}, write) of
- [] -> trie_remove_edge(X, Parent, Node, W),
- remove_path_if_empty(X, RestPath);
- _ -> ok
- end.
-
-trie_child(X, Node, Word) ->
- case mnesia:read({rabbit_topic_trie_edge,
- #trie_edge{exchange_name = X,
- node_id = Node,
- word = Word}}) of
- [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
- [] -> error
- end.
-
-trie_bindings(X, Node) ->
- MatchHead = #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = '$1',
- arguments = '_'}},
- mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
-
-trie_update_node_counts(X, Node, Field, Delta) ->
- E = case mnesia:read(rabbit_topic_trie_node,
- #trie_node{exchange_name = X,
- node_id = Node}, write) of
- [] -> #topic_trie_node{trie_node = #trie_node{
- exchange_name = X,
- node_id = Node},
- edge_count = 0,
- binding_count = 0};
- [E0] -> E0
- end,
- case setelement(Field, E, element(Field, E) + Delta) of
- #topic_trie_node{edge_count = 0, binding_count = 0} ->
- ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
- EN ->
- ok = mnesia:write(rabbit_topic_trie_node, EN, write)
- end.
-
-trie_add_edge(X, FromNode, ToNode, W) ->
- trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
- trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
-
-trie_remove_edge(X, FromNode, ToNode, W) ->
- trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
- trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
-
-trie_edge_op(X, FromNode, ToNode, W, Op) ->
- ok = Op(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = FromNode,
- word = W},
- node_id = ToNode},
- write).
-
-trie_add_binding(X, Node, D, Args) ->
- trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
- trie_binding_op(X, Node, D, Args, fun mnesia:write/3).
-
-trie_remove_binding(X, Node, D, Args) ->
- trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
- trie_binding_op(X, Node, D, Args, fun mnesia:delete_object/3).
-
-trie_binding_op(X, Node, D, Args, Op) ->
- ok = Op(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- destination = D,
- arguments = Args}},
- write).
-
-trie_remove_all_nodes(X) ->
- remove_all(rabbit_topic_trie_node,
- #topic_trie_node{trie_node = #trie_node{exchange_name = X,
- _ = '_'},
- _ = '_'}).
-
-trie_remove_all_edges(X) ->
- remove_all(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- _ = '_'},
- _ = '_'}).
-
-trie_remove_all_bindings(X) ->
- remove_all(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X, _ = '_'},
- _ = '_'}).
-
-remove_all(Table, Pattern) ->
- lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
- mnesia:match_object(Table, Pattern, write)).
-
-new_node_id() ->
- rabbit_guid:gen().
-
-split_topic_key(Key) ->
- split_topic_key(Key, [], []).
-
-split_topic_key(<<>>, [], []) ->
- [];
-split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
- lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
-split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
- split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
-split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
- split_topic_key(Rest, [C | RevWordAcc], RevResAcc).