summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Parra Corbacho <dparracorbac@vmware.com>2023-01-13 14:01:52 +0100
committerDiana Parra Corbacho <dparracorbac@vmware.com>2023-01-31 10:23:16 +0100
commit5b39e7e4cec8d844c2ac4ad5809065ecaa0251b2 (patch)
tree0d63f7c8081e2865b866e599e2f1a27b49cfb229
parentba1670a95a71829b3d6ee4555b64ad386ec66614 (diff)
downloadrabbitmq-server-git-5b39e7e4cec8d844c2ac4ad5809065ecaa0251b2.tar.gz
Move jms topic exchange Mnesia-specific code to rabbit_db_* modules
-rw-r--r--deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl143
-rw-r--r--deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl83
-rw-r--r--deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl27
3 files changed, 162 insertions, 91 deletions
diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl
new file mode 100644
index 0000000000..6ce28123ea
--- /dev/null
+++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl
@@ -0,0 +1,143 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
+%% -----------------------------------------------------------------------------
+-module(rabbit_db_jms_exchange).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("rabbit_jms_topic_exchange.hrl").
+
+-export([
+ setup_schema/0,
+ create_or_update/3,
+ insert/2,
+ get/1,
+ delete/1,
+ delete/3
+ ]).
+
+%% -------------------------------------------------------------------
+%% setup_schema()
+%% -------------------------------------------------------------------
+
+setup_schema() ->
+ rabbit_db:run(
+ #{mnesia => fun() -> setup_schema_in_mnesia() end
+ }).
+
+setup_schema_in_mnesia() ->
+ case mnesia:create_table( ?JMS_TOPIC_TABLE
+ , [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
+ , {record_name, ?JMS_TOPIC_RECORD}
+ , {type, set} ]
+ ) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
+ end,
+ ok.
+
+%% -------------------------------------------------------------------
+%% create_or_update().
+%% -------------------------------------------------------------------
+
+create_or_update(XName, BindingKeyAndFun, ErrorFun) ->
+ rabbit_db:run(
+ #{mnesia =>
+ fun() -> create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) end
+ }).
+
+create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
+ read_state_in_mnesia(XName, ErrorFun),
+ write_state_fun_in_mnesia(XName, put_item(BindingFuns, BindingKeyAndFun))
+ end).
+
+%% -------------------------------------------------------------------
+%% insert().
+%% -------------------------------------------------------------------
+
+insert(XName, BFuns) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> insert_in_mnesia(XName, BFuns) end
+ }).
+
+insert_in_mnesia(XName, BFuns) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ write_state_fun_in_mnesia(XName, BFuns)
+ end).
+
+%% -------------------------------------------------------------------
+%% get().
+%% -------------------------------------------------------------------
+
+get(XName) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> get_in_mnesia(XName) end
+ }).
+
+get_in_mnesia(XName) ->
+ mnesia:async_dirty(
+ fun() ->
+ case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of
+ [#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns}] ->
+ BindingFuns;
+ _ ->
+ not_found
+ end
+ end,
+ []
+ ).
+
+%% -------------------------------------------------------------------
+%% delete().
+%% -------------------------------------------------------------------
+
+delete(XName) ->
+ rabbit_db:run(
+ #{mnesia => fun() -> delete_in_mnesia(XName) end
+ }).
+
+delete_in_mnesia(XName) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> mnesia:delete(?JMS_TOPIC_TABLE, XName, write) end).
+
+delete(XName, BindingKeys, ErrorFun) ->
+ rabbit_db:run(
+ #{mnesia =>
+ fun() -> delete_in_mnesia(XName, BindingKeys, ErrorFun) end
+ }).
+
+delete_in_mnesia(XName, BindingKeys, ErrorFun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
+ read_state_in_mnesia(XName, ErrorFun),
+ write_state_fun_in_mnesia(XName, remove_items(BindingFuns, BindingKeys))
+ end).
+
+read_state_in_mnesia(XName, ErrorFun) ->
+ case mnesia:read(?JMS_TOPIC_TABLE, XName, write) of
+ [Rec] -> Rec;
+ _ -> ErrorFun(XName)
+ end.
+
+write_state_fun_in_mnesia(XName, BFuns) ->
+ mnesia:write( ?JMS_TOPIC_TABLE
+ , #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns}
+ , write ).
+
+%% -------------------------------------------------------------------
+%% dictionary handling
+%% -------------------------------------------------------------------
+
+% add an item to the dictionary of binding functions
+put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict).
+
+% remove a list of keyed items from the dictionary, by key
+remove_items(Dict, []) -> Dict;
+remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl
index 8032feb49c..66288d3f9a 100644
--- a/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl
+++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_jms_topic_exchange.erl
@@ -22,7 +22,7 @@
, route/2
, validate/1
, create/2
- , delete/3
+ , delete/2
, validate_binding/2
, add_binding/3
, remove_bindings/3
@@ -55,14 +55,7 @@
% Initialise database table for all exchanges of type <<"x-jms-topic">>
setup_db_schema() ->
- case mnesia:create_table( ?JMS_TOPIC_TABLE
- , [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
- , {record_name, ?JMS_TOPIC_RECORD}
- , {type, set} ]
- ) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
- end.
+ rabbit_db_jms_exchange:setup_schema().
%%----------------------------------------------------------------------------
%% R E F E R E N C E T Y P E I N F O R M A T I O N
@@ -111,30 +104,25 @@ route( #exchange{name = XName}
validate(_X) -> ok.
% After exchange declaration and recovery
-create(transaction, #exchange{name = XName}) ->
- add_initial_record(XName);
-create(_Tx, _X) ->
- ok.
+create(none, #exchange{name = XName}) ->
+ add_initial_record(XName).
% Delete an exchange
-delete(transaction, #exchange{name = XName}, _Bs) ->
- delete_state(XName),
- ok;
-delete(_Tx, _X, _Bs) ->
- ok.
+delete(none, #exchange{name = XName}) ->
+ delete_state(XName).
% Before add binding
validate_binding(_X, _B) -> ok.
% A new binding has ben added or recovered
-add_binding( Tx
+add_binding( none
, #exchange{name = XName}
, #binding{key = BindingKey, destination = Dest, args = Args}
) ->
Selector = get_string_arg(Args, ?RJMS_COMPILED_SELECTOR_ARG),
BindGen = generate_binding_fun(Selector),
- case {Tx, BindGen} of
- {transaction, {ok, BindFun}} ->
+ case BindGen of
+ {ok, BindFun} ->
add_binding_fun(XName, {{BindingKey, Dest}, BindFun});
{none, error} ->
parsing_error(XName, Selector, Dest);
@@ -144,13 +132,11 @@ add_binding( Tx
ok.
% Binding removal
-remove_bindings( transaction
+remove_bindings( none
, #exchange{name = XName}
, Bindings
) ->
remove_binding_funs(XName, Bindings),
- ok;
-remove_bindings(_Tx, _X, _Bs) ->
ok.
% Exchange argument equivalence
@@ -234,66 +220,27 @@ selector_match(Selector, Headers) ->
% get binding funs from state (using dirty_reads)
get_binding_funs_x(XName) ->
- mnesia:async_dirty(
- fun() ->
- case read_state_no_error(XName) of
- not_found ->
- not_found;
- #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} ->
- BindingFuns
- end
- end,
- []
- ).
+ rabbit_db_jms_exchange:get(XName).
add_initial_record(XName) ->
write_state_fun(XName, dict:new()).
% add binding fun to binding fun dictionary
add_binding_fun(XName, BindingKeyAndFun) ->
- #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = read_state_for_update(XName),
- write_state_fun(XName, put_item(BindingFuns, BindingKeyAndFun)).
+ rabbit_db_jms_exchange:create_or_update(XName, BindingKeyAndFun, fun exchange_state_corrupt_error/1).
% remove binding funs from binding fun dictionary
remove_binding_funs(XName, Bindings) ->
BindingKeys = [ {BindingKey, DestName} || #binding{key = BindingKey, destination = DestName} <- Bindings ],
- #?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} = read_state_for_update(XName),
- write_state_fun(XName, remove_items(BindingFuns, BindingKeys)).
-
-% add an item to the dictionary of binding functions
-put_item(Dict, {Key, Item}) -> dict:store(Key, Item, Dict).
-
-% remove a list of keyed items from the dictionary, by key
-remove_items(Dict, []) -> Dict;
-remove_items(Dict, [Key | Keys]) -> remove_items(dict:erase(Key, Dict), Keys).
+ rabbit_db_jms_exchange:delete(XName, BindingKeys, fun exchange_state_corrupt_error/1).
% delete all the state saved for this exchange
delete_state(XName) ->
- mnesia:delete(?JMS_TOPIC_TABLE, XName, write).
-
-% Basic read for update
-read_state_for_update(XName) -> read_state(XName, write).
-
-% Lockable read
-read_state(XName, Lock) ->
- case mnesia:read(?JMS_TOPIC_TABLE, XName, Lock) of
- [Rec] -> Rec;
- _ -> exchange_state_corrupt_error(XName)
- end.
-
-read_state_no_error(XName) ->
- case mnesia:read(?JMS_TOPIC_TABLE, XName, read) of
- [Rec] -> Rec;
- _ -> not_found
- end.
-
-
+ rabbit_db_jms_exchange:delete(XName).
% Basic write
write_state_fun(XName, BFuns) ->
- mnesia:write( ?JMS_TOPIC_TABLE
- , #?JMS_TOPIC_RECORD{x_name = XName, x_selector_funs = BFuns}
- , write ).
+ rabbit_db_jms_exchange:insert(XName, BFuns).
%%----------------------------------------------------------------------------
%% E R R O R S
diff --git a/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl b/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl
index cb0befc113..303c8253fe 100644
--- a/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl
+++ b/deps/rabbitmq_jms_topic_exchange/test/rjms_topic_selector_unit_SUITE.erl
@@ -20,15 +20,8 @@
-import(rabbit_jms_topic_exchange, [ description/0
, serialise_events/0
- , route/2
, validate/1
- , create/2
- , delete/3
- , validate_binding/2
- , add_binding/3
- , remove_bindings/3
- , assert_args_equivalence/2
- , policy_changed/3 ]).
+ , validate_binding/2 ]).
all() ->
@@ -42,10 +35,7 @@ groups() ->
description_test,
serialise_events_test,
validate_test,
- create_test,
- delete_test,
- validate_binding_test,
- add_binding_test
+ validate_binding_test
]}
].
@@ -82,19 +72,10 @@ serialise_events_test(_Config) ->
?assertMatch(false, serialise_events()).
validate_test(_Config) ->
- ?assertEqual(ok, validate(any_exchange)).
-
-create_test(_Config) ->
- ?assertEqual(ok, create(none, any_exchange)).
-
-delete_test(_Config) ->
- ?assertEqual(ok, delete(none, any_exchange, any_bindings)).
+ ?assertEqual(ok, validate(dummy_exchange())).
validate_binding_test(_Config) ->
- ?assertEqual(ok, validate_binding(any_exchange, any_bindings)).
-
-add_binding_test(_Config) ->
- ?assertEqual(ok, add_binding(none, dummy_exchange(), dummy_binding())).
+ ?assertEqual(ok, validate_binding(dummy_exchange(), dummy_binding())).
dummy_exchange() ->
#exchange{name = <<"XName">>, arguments = []}.