diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-10-27 17:05:36 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-10-27 23:28:06 -0400 |
commit | c0476adc21477f7cf076b8527c1327b158e2b190 (patch) | |
tree | 773c89a3f8171da06a70db81397053a116027ddf | |
parent | c793487a142db841a43d28baa6012b6509dac53c (diff) | |
download | couchdb-c0476adc21477f7cf076b8527c1327b158e2b190.tar.gz |
Optimize smoosh enqueuing
Make sure we don't do an O(n) filtering when enqueuing processes terminate.
Also, cleanup an old code_change clause, and add a tiny optimization to avoid
ununecessary external calls.
-rw-r--r-- | src/smoosh/src/smoosh_server.erl | 71 |
1 files changed, 49 insertions, 22 deletions
diff --git a/src/smoosh/src/smoosh_server.erl b/src/smoosh/src/smoosh_server.erl index 2a55082c1..27277be04 100644 --- a/src/smoosh/src/smoosh_server.erl +++ b/src/smoosh/src/smoosh_server.erl @@ -63,7 +63,8 @@ view_channels = [], tab, event_listener, - waiting = maps:new() + waiting = #{}, + waiting_by_ref = #{} }). -record(channel, { @@ -95,16 +96,16 @@ sync_enqueue(Object, Timeout) -> gen_server:call(?MODULE, {enqueue, Object}, Timeout). handle_db_event(DbName, local_updated, St) -> - smoosh_server:enqueue(DbName), + enqueue(DbName), {ok, St}; handle_db_event(DbName, updated, St) -> - smoosh_server:enqueue(DbName), + enqueue(DbName), {ok, St}; handle_db_event(DbName, {index_commit, IdxName}, St) -> - smoosh_server:enqueue({DbName, IdxName}), + enqueue({DbName, IdxName}), {ok, St}; handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) -> - smoosh_server:enqueue({DbName, IdxName}), + enqueue({DbName, IdxName}), {ok, St}; handle_db_event(_DbName, _Event, St) -> {ok, St}. @@ -193,14 +194,13 @@ handle_cast({new_view_channels, Channels}, State) -> || C <- State#state.view_channels -- Channels ], {noreply, create_missing_channels(State#state{view_channels = Channels})}; -handle_cast({enqueue, Object}, State) -> - #state{waiting = Waiting} = State, +handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) -> case maps:is_key(Object, Waiting) of true -> {noreply, State}; false -> {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]), - {noreply, State#state{waiting = maps:put(Object, Ref, Waiting)}} + {noreply, add_enqueue_ref(Object, Ref, State)} end. handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) -> @@ -218,12 +218,8 @@ handle_info({'EXIT', Pid, Reason}, State) -> ok end, {noreply, create_missing_channels(State)}; -handle_info({'DOWN', Ref, _, _, _}, State) -> - Waiting = maps:filter( - fun(_Key, Value) -> Value =/= Ref end, - State#state.waiting - ), - {noreply, State#state{waiting = Waiting}}; +handle_info({'DOWN', Ref, process, _, _}, #state{} = State) -> + {noreply, remove_enqueue_ref(Ref, State)}; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, nil), {noreply, State}; @@ -238,19 +234,23 @@ terminate(_Reason, State) -> ), ok. -code_change(_OldVsn, {state, DbChannels, ViewChannels, Tab, EventListener, Waiting}, _Extra) -> - {ok, #state{ - db_channels = DbChannels, - view_channels = ViewChannels, - tab = Tab, - event_listener = EventListener, - waiting = Waiting - }}; code_change(_OldVsn, State, _Extra) -> {ok, State}. % private functions. +add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) -> + #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State, + Waiting1 = Waiting#{Object => Ref}, + WaitingByRef1 = WaitingByRef#{Ref => Object}, + State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. + +remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) -> + #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State, + {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef), + {Ref, Waiting1} = maps:take(Object, Waiting), + State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. + get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) -> try gen_server:call(P, status) of {ok, Status} -> @@ -663,4 +663,31 @@ config_listener_mon() -> [] -> undefined end. +add_remove_enqueue_ref_test() -> + ObjCount = 10000, + ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)], + + St = lists:foldl( + fun({I, Ref}, #state{} = Acc) -> + add_enqueue_ref(I, Ref, Acc) + end, + #state{}, + ObjRefs + ), + + ?assertEqual(ObjCount, map_size(St#state.waiting)), + ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)), + + {_Objs, Refs} = lists:unzip(ObjRefs), + St1 = lists:foldl( + fun(Ref, #state{} = Acc) -> + remove_enqueue_ref(Ref, Acc) + end, + St, + Refs + ), + + % It's basically back to an initial (empty) state + ?assertEqual(St1, #state{}). + -endif. |