summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl380
1 files changed, 380 insertions, 0 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
new file mode 100644
index 00000000..113b7878
--- /dev/null
+++ b/src/rabbit_exchange.erl
@@ -0,0 +1,380 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial Technologies
+%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
+%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange).
+-include_lib("stdlib/include/qlc.hrl").
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-export([recover/0, declare/6, lookup/1, lookup_or_die/1,
+ list_vhost_exchanges/1, list_exchange_bindings/1,
+ simple_publish/6, simple_publish/3,
+ route/2]).
+-export([add_binding/2, delete_binding/2]).
+-export([delete/2]).
+-export([check_type/1, assert_type/2, topic_matches/2]).
+
+-import(mnesia).
+-import(sets).
+-import(lists).
+-import(qlc).
+-import(regexp).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(publish_res() :: {'ok', [pid()]} |
+ not_found() | {'error', 'unroutable' | 'not_delivered'}).
+
+-spec(recover/0 :: () -> 'ok').
+-spec(declare/6 :: (realm_name(), name(), exchange_type(), bool(), bool(),
+ amqp_table()) -> exchange()).
+-spec(check_type/1 :: (binary()) -> atom()).
+-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
+-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
+-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
+-spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]).
+-spec(list_exchange_bindings/1 :: (exchange_name()) ->
+ [{queue_name(), routing_key(), amqp_table()}]).
+-spec(simple_publish/6 ::
+ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
+ publish_res()).
+-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
+-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
+ 'ok' | not_found() |
+ {'error', 'durability_settings_incompatible'}).
+-spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
+ 'ok' | not_found()).
+-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(delete/2 :: (exchange_name(), bool()) ->
+ 'ok' | not_found() | {'error', 'in_use'}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+recover() ->
+ ok = recover_durable_exchanges(),
+ ok.
+
+recover_durable_exchanges() ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:foldl(fun (Exchange, Acc) ->
+ ok = mnesia:write(Exchange),
+ Acc
+ end, ok, durable_exchanges)
+ end).
+
+declare(RealmName, NameBin, Type, Durable, AutoDelete, Args) ->
+ XName = rabbit_misc:r(RealmName, exchange, NameBin),
+ Exchange = #exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args},
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({exchange, XName}) of
+ [] -> ok = mnesia:write(Exchange),
+ if Durable ->
+ ok = mnesia:write(
+ durable_exchanges, Exchange, write);
+ true -> ok
+ end,
+ ok = rabbit_realm:add(RealmName, XName),
+ Exchange;
+ [ExistingX] -> ExistingX
+ end
+ end).
+
+check_type(<<"fanout">>) ->
+ fanout;
+check_type(<<"direct">>) ->
+ direct;
+check_type(<<"topic">>) ->
+ topic;
+check_type(T) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]).
+
+assert_type(#exchange{ type = ActualType }, RequiredType)
+ when ActualType == RequiredType ->
+ ok;
+assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
+ [rabbit_misc:rs(Name), ActualType, RequiredType]).
+
+lookup(Name) ->
+ rabbit_misc:dirty_read({exchange, Name}).
+
+lookup_or_die(Name) ->
+ case lookup(Name) of
+ {ok, X} -> X;
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(Name)])
+ end.
+
+list_vhost_exchanges(VHostPath) ->
+ mnesia:dirty_match_object(
+ #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+
+list_exchange_bindings(Name) ->
+ [{QueueName, RoutingKey, Arguments} ||
+ #binding{handlers = Handlers} <- bindings_for_exchange(Name),
+ #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
+ arguments = Arguments},
+ queue = QueueName} <- Handlers].
+
+bindings_for_exchange(Name) ->
+ qlc:e(qlc:q([B ||
+ B = #binding{key = K} <- mnesia:table(binding),
+ element(1, K) == Name])).
+
+empty_handlers() ->
+ [].
+
+%% Usable by Erlang code that wants to publish messages.
+simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ Content = #content{class_id = ClassId,
+ properties = #'P_basic'{content_type = ContentTypeBin},
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]},
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ persistent_key = none},
+ simple_publish(Mandatory, Immediate, Message).
+
+%% Usable by Erlang code that wants to publish messages.
+simple_publish(Mandatory, Immediate,
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey}) ->
+ case lookup(ExchangeName) of
+ {ok, Exchange} ->
+ QPids = route(Exchange, RoutingKey),
+ rabbit_router:deliver(QPids, Mandatory, Immediate,
+ none, Message);
+ {error, Error} -> {error, Error}
+ end.
+
+%% return the list of qpids to which a message with a given routing
+%% key, sent to a particular exchange, should be delivered.
+%%
+%% The function ensures that a qpid appears in the return list exactly
+%% as many times as a message should be delivered to it. With the
+%% current exchange types that is at most once.
+route(#exchange{name = Name, type = topic}, RoutingKey) ->
+ sets:to_list(
+ sets:union(
+ mnesia:activity(
+ async_dirty,
+ fun () ->
+ qlc:e(qlc:q([handler_qpids(H) ||
+ #binding{key = {Name1, PatternKey},
+ handlers = H}
+ <- mnesia:table(binding),
+ Name == Name1,
+ topic_matches(PatternKey, RoutingKey)]))
+ end)));
+
+route(#exchange{name = Name, type = Type}, RoutingKey) ->
+ BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
+ case rabbit_misc:dirty_read({binding, BindingKey}) of
+ {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
+ {error, not_found} -> []
+ end.
+
+delivery_key_for_type(fanout, Name, _RoutingKey) ->
+ {Name, fanout};
+delivery_key_for_type(_Type, Name, RoutingKey) ->
+ {Name, RoutingKey}.
+
+call_with_exchange(Name, Fun) ->
+ case mnesia:wread({exchange, Name}) of
+ [] -> {error, not_found};
+ [X] -> Fun(X)
+ end.
+
+make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
+ #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
+
+add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
+ routing_key = RoutingKey}, Q) ->
+ call_with_exchange(
+ ExchangeName,
+ fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ internal_add_binding(
+ X, RoutingKey, make_handler(BindingSpec, Q))
+ end
+ end).
+
+delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
+ routing_key = RoutingKey}, Q) ->
+ call_with_exchange(
+ ExchangeName,
+ fun (X) -> ok = internal_delete_binding(
+ X, RoutingKey, make_handler(BindingSpec, Q)),
+ maybe_auto_delete(X)
+ end).
+
+%% Must run within a transaction.
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ ok;
+maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
+ case internal_delete(ExchangeName, true) of
+ {error, in_use} -> ok;
+ ok -> ok
+ end.
+
+handlers_isempty([]) -> true;
+handlers_isempty([_|_]) -> false.
+
+extend_handlers(Handlers, Handler) -> [Handler | Handlers].
+
+delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers).
+
+handler_qpids(Handlers) ->
+ sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
+
+%% Must run within a transaction.
+internal_add_binding(#exchange{name = ExchangeName, type = Type},
+ RoutingKey, Handler) ->
+ BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
+ ok = add_handler_to_binding(BindingKey, Handler).
+
+%% Must run within a transaction.
+internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) ->
+ BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
+ remove_handler_from_binding(BindingKey, Handler),
+ ok.
+
+%% Must run within a transaction.
+add_handler_to_binding(BindingKey, Handler) ->
+ ok = case mnesia:wread({binding, BindingKey}) of
+ [] ->
+ ok = mnesia:write(
+ #binding{key = BindingKey,
+ handlers = extend_handlers(
+ empty_handlers(), Handler)});
+ [B = #binding{handlers = H}] ->
+ ok = mnesia:write(
+ B#binding{handlers = extend_handlers(H, Handler)})
+ end.
+
+%% Must run within a transaction.
+remove_handler_from_binding(BindingKey, Handler) ->
+ case mnesia:wread({binding, BindingKey}) of
+ [] -> empty;
+ [B = #binding{handlers = H}] ->
+ H1 = delete_handler(H, Handler),
+ case handlers_isempty(H1) of
+ true ->
+ ok = mnesia:delete({binding, BindingKey}),
+ empty;
+ _ ->
+ ok = mnesia:write(B#binding{handlers = H1}),
+ not_empty
+ end
+ end.
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
+delete(ExchangeName, IfUnused) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> internal_delete(ExchangeName, IfUnused) end).
+
+internal_delete(ExchangeName, _IfUnused = true) ->
+ Bindings = bindings_for_exchange(ExchangeName),
+ case Bindings of
+ [] -> do_internal_delete(ExchangeName, Bindings);
+ _ ->
+ case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
+ Bindings) of
+ true ->
+ %% There are no handlers anywhere in any of the
+ %% bindings for this exchange.
+ do_internal_delete(ExchangeName, Bindings);
+ false ->
+ %% There was at least one real handler
+ %% present. It's still in use.
+ {error, in_use}
+ end
+ end;
+internal_delete(ExchangeName, false) ->
+ do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)).
+
+forcibly_remove_handlers(Handlers) ->
+ lists:foreach(
+ fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
+ ok = rabbit_amqqueue:binding_forcibly_removed(
+ BindingSpec, QueueName)
+ end, Handlers),
+ ok.
+
+do_internal_delete(ExchangeName, Bindings) ->
+ case mnesia:wread({exchange, ExchangeName}) of
+ [] -> {error, not_found};
+ _ ->
+ lists:foreach(fun (#binding{key = K, handlers = H}) ->
+ ok = forcibly_remove_handlers(H),
+ ok = mnesia:delete({binding, K})
+ end, Bindings),
+ ok = mnesia:delete({durable_exchanges, ExchangeName}),
+ ok = mnesia:delete({exchange, ExchangeName}),
+ ok = rabbit_realm:delete_from_all(ExchangeName)
+ end.