summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange_decorator_lvc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange_decorator_lvc.erl')
-rw-r--r--src/rabbit_exchange_decorator_lvc.erl111
1 files changed, 111 insertions, 0 deletions
diff --git a/src/rabbit_exchange_decorator_lvc.erl b/src/rabbit_exchange_decorator_lvc.erl
new file mode 100644
index 00000000..b5ce1954
--- /dev/null
+++ b/src/rabbit_exchange_decorator_lvc.erl
@@ -0,0 +1,111 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2013-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_decorator_lvc).
+-include("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "LVC exchange decorator"},
+ {mfa, {rabbit_registry, register,
+ [exchange_decorator, <<"lvc">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [exchange_decorator_route, <<"lvc">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"lvc">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+-behaviour(rabbit_exchange_decorator).
+-behaviour(rabbit_policy_validator).
+
+-export([description/0, serialise_events/1]).
+-export([create/2, delete/3, add_binding/3, remove_bindings/3,
+ policy_changed/3, route/2]).
+-export([validate_policy/1]).
+
+%%----------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"LVC exchange decorator">>}].
+
+serialise_events(_) -> false.
+
+create(_Tx, _X) -> ok.
+
+delete(_Tx, _X, _Bs) -> ok.
+
+add_binding(none, X, #binding{key = Key,
+ destination = #resource{kind = queue} = Queue}) ->
+ case policy(X) of
+ false ->
+ ok;
+ _ ->
+ rabbit_amqqueue:with(
+ queue_name(X, Key),
+ fun (#amqqueue{pid = QPid}) ->
+ gen_server2:call(QPid, {copy, Queue})
+ end),
+ ok
+ end;
+add_binding(_Tx, _X, _B) ->
+ ok.
+
+remove_bindings(transaction, _X, _Bs) ->
+ ok;
+remove_bindings(none, X = #exchange{name = _XName}, _Bs) ->
+ case policy(X) of
+ false -> ok;
+ _Max -> ok % TODO: no key in binding?
+ end.
+
+route(X, #delivery{message = #basic_message{routing_keys = RKs}}) ->
+ case policy(X) of
+ false ->
+ [];
+ Max ->
+ [begin
+ rabbit_amqqueue:declare(queue_name(X, RK), false, false,
+ [{<<"x-max-length">>, long, Max}],
+ none),
+ queue_name(X, RK)
+ end || RK <- RKs]
+ end.
+
+policy_changed(_Tx, _OldX, _NewX) -> ok.
+
+%%----------------------------------------------------------------------------
+
+validate_policy([{<<"lvc">>, MaxCache}]) when is_integer(MaxCache) ->
+ ok;
+validate_policy(Invalid) ->
+ {error, "~p invalid LVC policy", [Invalid]}.
+
+%%----------------------------------------------------------------------------
+
+queue_name(#exchange{name = #resource{virtual_host = VHostPath,
+ kind = exchange,
+ name = Name}}, Key) ->
+ rabbit_misc:r(
+ VHostPath, queue,
+ list_to_binary([Name, ".", Key, ".",
+ base64:encode(erlang:md5(term_to_binary({Name, Key})))])).
+
+
+policy(#exchange{} = X) ->
+ case rabbit_policy:get(<<"lvc">>, X) of
+ {ok, Max} -> Max;
+ {error, not_found} -> false
+ end.