summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl')
-rw-r--r--deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl206
1 files changed, 206 insertions, 0 deletions
diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl
new file mode 100644
index 0000000000..7758f72184
--- /dev/null
+++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_exchange_type_recent_history.erl
@@ -0,0 +1,206 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+-module(rabbit_exchange_type_recent_history).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include("rabbit_recent_history.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-import(rabbit_misc, [table_lookup/2]).
+
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, validate_binding/2, create/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2, policy_changed/2]).
+-export([setup_schema/0, disable_plugin/0]).
+-export([info/1, info/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type x-recent-history"},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"x-recent-history">>, ?MODULE]}},
+ {cleanup, {?MODULE, disable_plugin, []}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_exchange_type_recent_history_mnesia,
+ [{description, "recent history exchange type: mnesia"},
+ {mfa, {?MODULE, setup_schema, []}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
+-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+
+info(_X) -> [].
+info(_X, _) -> [].
+
+description() ->
+ [{name, <<"x-recent-history">>},
+ {description, <<"Recent History Exchange.">>}].
+
+serialise_events() -> false.
+
+route(#exchange{name = XName,
+ arguments = Args},
+ #delivery{message = Message}) ->
+ Length = table_lookup(Args, <<"x-recent-history-length">>),
+ maybe_cache_msg(XName, Message, Length),
+ rabbit_router:match_routing_key(XName, ['_']).
+
+validate(#exchange{arguments = Args}) ->
+ case table_lookup(Args, <<"x-recent-history-length">>) of
+ undefined ->
+ ok;
+ {Type, Val} ->
+ case check_int_arg(Type) of
+ ok when Val > 0 ->
+ ok;
+ _ ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "Invalid argument ~p, "
+ "'x-recent-history-length' "
+ "must be a positive integer",
+ [Val])
+ end
+ end.
+
+validate_binding(_X, _B) -> ok.
+create(_Tx, _X) -> ok.
+policy_changed(_X1, _X2) -> ok.
+
+delete(transaction, #exchange{ name = XName }, _Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ mnesia:delete(?RH_TABLE, XName, write)
+ end),
+ ok;
+delete(none, _Exchange, _Bs) ->
+ ok.
+
+add_binding(transaction, #exchange{ name = XName },
+ #binding{ destination = #resource{kind = queue} = QName }) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {error, not_found} ->
+ destination_not_found_error(QName);
+ {ok, Q} ->
+ Msgs = get_msgs_from_cache(XName),
+ deliver_messages([Q], Msgs)
+ end,
+ ok;
+add_binding(transaction, #exchange{ name = XName },
+ #binding{ destination = #resource{kind = exchange} = DestName }) ->
+ case rabbit_exchange:lookup(DestName) of
+ {error, not_found} ->
+ destination_not_found_error(DestName);
+ {ok, X} ->
+ Msgs = get_msgs_from_cache(XName),
+ [begin
+ Delivery = rabbit_basic:delivery(false, false, Msg, undefined),
+ Qs = rabbit_exchange:route(X, Delivery),
+ case rabbit_amqqueue:lookup(Qs) of
+ [] ->
+ destination_not_found_error(Qs);
+ QPids ->
+ deliver_messages(QPids, [Msg])
+ end
+ end || Msg <- Msgs]
+ end,
+ ok;
+add_binding(none, _Exchange, _Binding) ->
+ ok.
+
+remove_bindings(_Tx, _X, _Bs) -> ok.
+
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
+
+%%----------------------------------------------------------------------------
+
+setup_schema() ->
+ mnesia:create_table(?RH_TABLE,
+ [{attributes, record_info(fields, cached)},
+ {record_name, cached},
+ {type, set}]),
+ mnesia:add_table_copy(?RH_TABLE, node(), ram_copies),
+ mnesia:wait_for_tables([?RH_TABLE], 30000),
+ ok.
+
+disable_plugin() ->
+ rabbit_registry:unregister(exchange, <<"x-recent-history">>),
+ mnesia:delete_table(?RH_TABLE),
+ ok.
+
+%%----------------------------------------------------------------------------
+%%private
+maybe_cache_msg(XName,
+ #basic_message{content =
+ #content{properties =
+ #'P_basic'{headers = Headers}}}
+ = Message,
+ Length) ->
+ case Headers of
+ undefined ->
+ cache_msg(XName, Message, Length);
+ _ ->
+ Store = table_lookup(Headers, <<"x-recent-history-no-store">>),
+ case Store of
+ {bool, true} ->
+ ok;
+ _ ->
+ cache_msg(XName, Message, Length)
+ end
+ end.
+
+cache_msg(XName, Message, Length) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Cached = get_msgs_from_cache(XName),
+ store_msg(XName, Cached, Message, Length)
+ end).
+
+get_msgs_from_cache(XName) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read(?RH_TABLE, XName) of
+ [] ->
+ [];
+ [#cached{key = XName, content=Cached}] ->
+ Cached
+ end
+ end).
+
+store_msg(Key, Cached, Message, undefined) ->
+ store_msg0(Key, Cached, Message, ?KEEP_NB);
+store_msg(Key, Cached, Message, {_Type, Length}) ->
+ store_msg0(Key, Cached, Message, Length).
+
+store_msg0(Key, Cached, Message, Length) ->
+ mnesia:write(?RH_TABLE,
+ #cached{key = Key,
+ content = [Message|lists:sublist(Cached, Length-1)]},
+ write).
+
+deliver_messages(Qs, Msgs) ->
+ lists:map(
+ fun (Msg) ->
+ Delivery = rabbit_basic:delivery(false, false, Msg, undefined),
+ rabbit_amqqueue:deliver(Qs, Delivery)
+ end, lists:reverse(Msgs)).
+
+destination_not_found_error(DestName) ->
+ rabbit_misc:protocol_error(
+ internal_error,
+ "could not find queue/exchange '~s'",
+ [DestName]).
+
+%% adapted from rabbit_amqqueue.erl
+check_int_arg(Type) ->
+ case lists:member(Type, ?INTEGER_ARG_TYPES) of
+ true -> ok;
+ false -> {error, {unacceptable_type, Type}}
+ end.