summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Smyczyński <s.smyczynski@simplito.com>2021-03-04 00:28:59 +0100
committerSebastian Smyczyński <s.smyczynski@simplito.com>2021-03-04 00:28:59 +0100
commit455085809cb5de2570bbba7da8b607c38939e83c (patch)
treea52bedd601daac94d6b641a4bf7c2ffb530e26af
parent3ae9aa2f555171c84e123720d2cfe84b291a735c (diff)
downloaderlang-455085809cb5de2570bbba7da8b607c38939e83c.tar.gz
Optimise mnesia table loading time for busy tables
-rw-r--r--lib/mnesia/src/mnesia_loader.erl52
1 files changed, 41 insertions, 11 deletions
diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl
index c34a83bb6c..a2fde5c808 100644
--- a/lib/mnesia/src/mnesia_loader.erl
+++ b/lib/mnesia/src/mnesia_loader.erl
@@ -320,6 +320,11 @@ start_remote_sender(Node,Tab,Storage) ->
table_init_fun(SenderPid, Storage) ->
fun(read) ->
+ % We want to store subscribed mnesia table events received during
+ % table copying for later processing to not let receiver message queue
+ % to grow too much (which in consequence would slow down the whole copying process)
+ SubscrCache = ets:new(subscr_cache, [private, ordered_set]),
+ put(mnesia_table_receiver_subscr_cache, SubscrCache),
Receiver = self(),
SenderPid ! {Receiver, more},
get_data(SenderPid, Receiver, Storage);
@@ -471,6 +476,10 @@ get_data(Pid, TabRec, Storage) ->
end;
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
+ get_data(Pid, TabRec, Storage);
+ {mnesia_table_event, _} = SubscrEvent ->
+ SubscrCache = get(mnesia_table_receiver_subscr_cache),
+ ets:insert(SubscrCache, {erlang:unique_integer([monotonic]), SubscrEvent}),
get_data(Pid, TabRec, Storage)
end.
@@ -542,7 +551,7 @@ init_table(Tab, _, Fun, _DetsInfo,_) ->
finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
TabRef = {Storage, Tab},
- subscr_receiver(TabRef, Cs#cstruct.record_name),
+ subscr_postprocess(TabRef, Cs#cstruct.record_name),
case handle_last(TabRef, Cs#cstruct.type, DatBin) of
ok ->
mnesia_index:init_index(Tab, Storage),
@@ -558,8 +567,37 @@ finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
down(Tab, Storage)
end.
+subscr_postprocess(TabRef, RecName) ->
+ % process events received during table copying
+ case get(mnesia_table_receiver_subscr_cache) of
+ undefined ->
+ ok;
+ SubscrCache ->
+ ets:foldl(
+ fun({_, Event}, _Acc) ->
+ handle_subscr_event(Event, TabRef, RecName)
+ end, ok, SubscrCache),
+ ets:delete(SubscrCache)
+ end,
+ % and all remaining events
+ subscr_receiver(TabRef, RecName).
+
subscr_receiver(TabRef = {_, Tab}, RecName) ->
receive
+ {mnesia_table_event, {_Op, Val, _Tid}} = Event
+ when element(1, Val) =:= Tab; element(1, Val) =:= schema ->
+ handle_subscr_event(Event, TabRef, RecName),
+ subscr_receiver(TabRef, RecName);
+
+ {'EXIT', Pid, Reason} ->
+ handle_exit(Pid, Reason),
+ subscr_receiver(TabRef, RecName)
+ after 0 ->
+ ok
+ end.
+
+handle_subscr_event(Event, TabRef = {_, Tab}, RecName) ->
+ case Event of
{mnesia_table_event, {Op, Val, _Tid}}
when element(1, Val) =:= Tab ->
if
@@ -567,8 +605,7 @@ subscr_receiver(TabRef = {_, Tab}, RecName) ->
handle_event(TabRef, Op, Val);
true ->
handle_event(TabRef, Op, setelement(1, Val, RecName))
- end,
- subscr_receiver(TabRef, RecName);
+ end;
{mnesia_table_event, {Op, Val, _Tid}} when element(1, Val) =:= schema ->
%% clear_table is faked via two schema events
@@ -576,14 +613,7 @@ subscr_receiver(TabRef = {_, Tab}, RecName) ->
case Op of
delete -> handle_event(TabRef, clear_table, {Tab, all});
_ -> ok
- end,
- subscr_receiver(TabRef, RecName);
-
- {'EXIT', Pid, Reason} ->
- handle_exit(Pid, Reason),
- subscr_receiver(TabRef, RecName)
- after 0 ->
- ok
+ end
end.
handle_event(TabRef, write, Rec) ->