summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-10-27 17:05:36 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-10-27 23:28:06 -0400
commitc0476adc21477f7cf076b8527c1327b158e2b190 (patch)
tree773c89a3f8171da06a70db81397053a116027ddf
parentc793487a142db841a43d28baa6012b6509dac53c (diff)
downloadcouchdb-c0476adc21477f7cf076b8527c1327b158e2b190.tar.gz
Optimize smoosh enqueuing
Make sure we don't do an O(n) filtering when enqueuing processes terminate. Also, cleanup an old code_change clause, and add a tiny optimization to avoid ununecessary external calls.
-rw-r--r--src/smoosh/src/smoosh_server.erl71
1 files changed, 49 insertions, 22 deletions
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl
index 2a55082c1..27277be04 100644
--- a/src/smoosh/src/smoosh_server.erl
+++ b/src/smoosh/src/smoosh_server.erl
@@ -63,7 +63,8 @@
view_channels = [],
tab,
event_listener,
- waiting = maps:new()
+ waiting = #{},
+ waiting_by_ref = #{}
}).
-record(channel, {
@@ -95,16 +96,16 @@ sync_enqueue(Object, Timeout) ->
gen_server:call(?MODULE, {enqueue, Object}, Timeout).
handle_db_event(DbName, local_updated, St) ->
- smoosh_server:enqueue(DbName),
+ enqueue(DbName),
{ok, St};
handle_db_event(DbName, updated, St) ->
- smoosh_server:enqueue(DbName),
+ enqueue(DbName),
{ok, St};
handle_db_event(DbName, {index_commit, IdxName}, St) ->
- smoosh_server:enqueue({DbName, IdxName}),
+ enqueue({DbName, IdxName}),
{ok, St};
handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) ->
- smoosh_server:enqueue({DbName, IdxName}),
+ enqueue({DbName, IdxName}),
{ok, St};
handle_db_event(_DbName, _Event, St) ->
{ok, St}.
@@ -193,14 +194,13 @@ handle_cast({new_view_channels, Channels}, State) ->
|| C <- State#state.view_channels -- Channels
],
{noreply, create_missing_channels(State#state{view_channels = Channels})};
-handle_cast({enqueue, Object}, State) ->
- #state{waiting = Waiting} = State,
+handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) ->
case maps:is_key(Object, Waiting) of
true ->
{noreply, State};
false ->
{_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]),
- {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}}
+ {noreply, add_enqueue_ref(Object, Ref, State)}
end.
handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) ->
@@ -218,12 +218,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
ok
end,
{noreply, create_missing_channels(State)};
-handle_info({'DOWN', Ref, _, _, _}, State) ->
- Waiting = maps:filter(
- fun(_Key, Value) -> Value =/= Ref end,
- State#state.waiting
- ),
- {noreply, State#state{waiting = Waiting}};
+handle_info({'DOWN', Ref, process, _, _}, #state{} = State) ->
+ {noreply, remove_enqueue_ref(Ref, State)};
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
@@ -238,19 +234,23 @@ terminate(_Reason, State) ->
),
ok.
-code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab, EventListener, Waiting}, _Extra) ->
- {ok, #state{
- db_channels = DbChannels,
- view_channels = ViewChannels,
- tab = Tab,
- event_listener = EventListener,
- waiting = Waiting
- }};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% private functions.
+add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) ->
+ #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+ Waiting1 = Waiting#{Object => Ref},
+ WaitingByRef1 = WaitingByRef#{Ref => Object},
+ State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
+remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) ->
+ #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State,
+ {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef),
+ {Ref, Waiting1} = maps:take(Object, Waiting),
+ State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}.
+
get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) ->
try gen_server:call(P, status) of
{ok, Status} ->
@@ -663,4 +663,31 @@ config_listener_mon() ->
[] -> undefined
end.
+add_remove_enqueue_ref_test() ->
+ ObjCount = 10000,
+ ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)],
+
+ St = lists:foldl(
+ fun({I, Ref}, #state{} = Acc) ->
+ add_enqueue_ref(I, Ref, Acc)
+ end,
+ #state{},
+ ObjRefs
+ ),
+
+ ?assertEqual(ObjCount, map_size(St#state.waiting)),
+ ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)),
+
+ {_Objs, Refs} = lists:unzip(ObjRefs),
+ St1 = lists:foldl(
+ fun(Ref, #state{} = Acc) ->
+ remove_enqueue_ref(Ref, Acc)
+ end,
+ St,
+ Refs
+ ),
+
+ % It's basically back to an initial (empty) state
+ ?assertEqual(St1, #state{}).
+
-endif.