diff options
| author | Diana Parra Corbacho <dparracorbac@vmware.com> | 2023-01-13 14:01:52 +0100 |
|---|---|---|
| committer | Diana Parra Corbacho <dparracorbac@vmware.com> | 2023-01-31 10:23:16 +0100 |
| commit | 5b39e7e4cec8d844c2ac4ad5809065ecaa0251b2 (patch) | |
| tree | 0d63f7c8081e2865b866e599e2f1a27b49cfb229 | |
| parent | ba1670a95a71829b3d6ee4555b64ad386ec66614 (diff) | |
| download | rabbitmq-server-git-5b39e7e4cec8d844c2ac4ad5809065ecaa0251b2.tar.gz | |
Move jms topic exchange Mnesia-specific code to rabbit_db_* modules
3 files changed, 162 insertions, 91 deletions
diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl new file mode 100644 index 0000000000..6ce28123ea --- /dev/null +++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl @@ -0,0 +1,143 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +%% ----------------------------------------------------------------------------- +-module(rabbit_db_jms_exchange). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("rabbit_jms_topic_exchange.hrl"). + +-export([ + setup_schema/0, + create_or_update/3, + insert/2, + get/1, + delete/1, + delete/3 + ]). + +%% ------------------------------------------------------------------- +%% setup_schema() +%% ------------------------------------------------------------------- + +setup_schema() -> + rabbit_db:run( + #{mnesia => fun() -> setup_schema_in_mnesia() end + }). + +setup_schema_in_mnesia() -> + case mnesia:create_table( ?JMS_TOPIC_TABLE + , [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)} + , {record_name, ?JMS_TOPIC_RECORD} + , {type, set} ] + ) of + {atomic, ok} -> ok; + {aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok + end, + ok. + +%% ------------------------------------------------------------------- +%% create_or_update(). +%% ------------------------------------------------------------------- + +create_or_update(XName, BindingKeyAndFun, ErrorFun) -> + rabbit_db:run( + #{mnesia => + fun() -> create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) end + }). + +create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = + read_state_in_mnesia(XName, ErrorFun), + write_state_fun_in_mnesia(XName, put_item(BindingFuns, BindingKeyAndFun)) + end). + +%% ------------------------------------------------------------------- +%% insert(). +%% ------------------------------------------------------------------- + +insert(XName, BFuns) -> + rabbit_db:run( + #{mnesia => fun() -> insert_in_mnesia(XName, BFuns) end + }). + +insert_in_mnesia(XName, BFuns) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + write_state_fun_in_mnesia(XName, BFuns) + end). + +%% ------------------------------------------------------------------- +%% get(). +%% ------------------------------------------------------------------- + +get(XName) -> + rabbit_db:run( + #{mnesia => fun() -> get_in_mnesia(XName) end + }). + +get_in_mnesia(XName) -> + mnesia:async_dirty( + fun() -> + case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of + [#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns}] -> + BindingFuns; + _ -> + not_found + end + end, + [] + ). + +%% ------------------------------------------------------------------- +%% delete(). +%% ------------------------------------------------------------------- + +delete(XName) -> + rabbit_db:run( + #{mnesia => fun() -> delete_in_mnesia(XName) end + }). + +delete_in_mnesia(XName) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> mnesia:delete(?JMS_TOPIC_TABLE, XName, write) end). + +delete(XName, BindingKeys, ErrorFun) -> + rabbit_db:run( + #{mnesia => + fun() -> delete_in_mnesia(XName, BindingKeys, ErrorFun) end + }). + +delete_in_mnesia(XName, BindingKeys, ErrorFun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = + read_state_in_mnesia(XName, ErrorFun), + write_state_fun_in_mnesia(XName, remove_items(BindingFuns, BindingKeys)) + end). + +read_state_in_mnesia(XName, ErrorFun) -> + case mnesia:read(?JMS_TOPIC_TABLE, XName, write) of + [Rec] -> Rec; + _ -> ErrorFun(XName) + end. + +write_state_fun_in_mnesia(XName, BFuns) -> + mnesia:write( ?JMS_TOPIC_TABLE + , #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns} + , write ). + +%% ------------------------------------------------------------------- +%% dictionary handling +%% ------------------------------------------------------------------- + +% add an item to the dictionary of binding functions +put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict). + +% remove a list of keyed items from the dictionary, by key +remove_items(Dict, []) -> Dict; +remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys). diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl index 8032feb49c..66288d3f9a 100644 --- a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl +++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl @@ -22,7 +22,7 @@ , route/2 , validate/1 , create/2 - , delete/3 + , delete/2 , validate_binding/2 , add_binding/3 , remove_bindings/3 @@ -55,14 +55,7 @@ % Initialise database table for all exchanges of type <<"x-jms-topic">> setup_db_schema() -> - case mnesia:create_table( ?JMS_TOPIC_TABLE - , [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)} - , {record_name, ?JMS_TOPIC_RECORD} - , {type, set} ] - ) of - {atomic, ok} -> ok; - {aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok - end. + rabbit_db_jms_exchange:setup_schema(). %%---------------------------------------------------------------------------- %% R E F E R E N C E T Y P E I N F O R M A T I O N @@ -111,30 +104,25 @@ route( #exchange{name = XName} validate(_X) -> ok. % After exchange declaration and recovery -create(transaction, #exchange{name = XName}) -> - add_initial_record(XName); -create(_Tx, _X) -> - ok. +create(none, #exchange{name = XName}) -> + add_initial_record(XName). % Delete an exchange -delete(transaction, #exchange{name = XName}, _Bs) -> - delete_state(XName), - ok; -delete(_Tx, _X, _Bs) -> - ok. +delete(none, #exchange{name = XName}) -> + delete_state(XName). % Before add binding validate_binding(_X, _B) -> ok. % A new binding has ben added or recovered -add_binding( Tx +add_binding( none , #exchange{name = XName} , #binding{key = BindingKey, destination = Dest, args = Args} ) -> Selector = get_string_arg(Args, ?RJMS_COMPILED_SELECTOR_ARG), BindGen = generate_binding_fun(Selector), - case {Tx, BindGen} of - {transaction, {ok, BindFun}} -> + case BindGen of + {ok, BindFun} -> add_binding_fun(XName, {{BindingKey, Dest}, BindFun}); {none, error} -> parsing_error(XName, Selector, Dest); @@ -144,13 +132,11 @@ add_binding( Tx ok. % Binding removal -remove_bindings( transaction +remove_bindings( none , #exchange{name = XName} , Bindings ) -> remove_binding_funs(XName, Bindings), - ok; -remove_bindings(_Tx, _X, _Bs) -> ok. % Exchange argument equivalence @@ -234,66 +220,27 @@ selector_match(Selector, Headers) -> % get binding funs from state (using dirty_reads) get_binding_funs_x(XName) -> - mnesia:async_dirty( - fun() -> - case read_state_no_error(XName) of - not_found -> - not_found; - #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} -> - BindingFuns - end - end, - [] - ). + rabbit_db_jms_exchange:get(XName). add_initial_record(XName) -> write_state_fun(XName, dict:new()). % add binding fun to binding fun dictionary add_binding_fun(XName, BindingKeyAndFun) -> - #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = read_state_for_update(XName), - write_state_fun(XName, put_item(BindingFuns, BindingKeyAndFun)). + rabbit_db_jms_exchange:create_or_update(XName, BindingKeyAndFun, fun exchange_state_corrupt_error/1). % remove binding funs from binding fun dictionary remove_binding_funs(XName, Bindings) -> BindingKeys = [ {BindingKey, DestName} || #binding{key = BindingKey, destination = DestName} <- Bindings ], - #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = read_state_for_update(XName), - write_state_fun(XName, remove_items(BindingFuns, BindingKeys)). - -% add an item to the dictionary of binding functions -put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict). - -% remove a list of keyed items from the dictionary, by key -remove_items(Dict, []) -> Dict; -remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys). + rabbit_db_jms_exchange:delete(XName, BindingKeys, fun exchange_state_corrupt_error/1). % delete all the state saved for this exchange delete_state(XName) -> - mnesia:delete(?JMS_TOPIC_TABLE, XName, write). - -% Basic read for update -read_state_for_update(XName) -> read_state(XName, write). - -% Lockable read -read_state(XName, Lock) -> - case mnesia:read(?JMS_TOPIC_TABLE, XName, Lock) of - [Rec] -> Rec; - _ -> exchange_state_corrupt_error(XName) - end. - -read_state_no_error(XName) -> - case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of - [Rec] -> Rec; - _ -> not_found - end. - - + rabbit_db_jms_exchange:delete(XName). % Basic write write_state_fun(XName, BFuns) -> - mnesia:write( ?JMS_TOPIC_TABLE - , #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns} - , write ). + rabbit_db_jms_exchange:insert(XName, BFuns). %%---------------------------------------------------------------------------- %% E R R O R S diff --git a/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl b/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl index cb0befc113..303c8253fe 100644 --- a/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl +++ b/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl @@ -20,15 +20,8 @@ -import(rabbit_jms_topic_exchange, [ description/0 , serialise_events/0 - , route/2 , validate/1 - , create/2 - , delete/3 - , validate_binding/2 - , add_binding/3 - , remove_bindings/3 - , assert_args_equivalence/2 - , policy_changed/3 ]). + , validate_binding/2 ]). all() -> @@ -42,10 +35,7 @@ groups() -> description_test, serialise_events_test, validate_test, - create_test, - delete_test, - validate_binding_test, - add_binding_test + validate_binding_test ]} ]. @@ -82,19 +72,10 @@ serialise_events_test(_Config) -> ?assertMatch(false, serialise_events()). validate_test(_Config) -> - ?assertEqual(ok, validate(any_exchange)). - -create_test(_Config) -> - ?assertEqual(ok, create(none, any_exchange)). - -delete_test(_Config) -> - ?assertEqual(ok, delete(none, any_exchange, any_bindings)). + ?assertEqual(ok, validate(dummy_exchange())). validate_binding_test(_Config) -> - ?assertEqual(ok, validate_binding(any_exchange, any_bindings)). - -add_binding_test(_Config) -> - ?assertEqual(ok, add_binding(none, dummy_exchange(), dummy_binding())). + ?assertEqual(ok, validate_binding(dummy_exchange(), dummy_binding())). dummy_exchange() -> #exchange{name = <<"XName">>, arguments = []}. |
