-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- start_link/0
- init/1,
- terminate/2,
- handle_event/3,
- handle_cast/2,
- handle_info/2
--define(RELISTEN_DELAY, 500).
--define(RELISTEN_DELAY, 5000).
--record(state, {
- nodes,
- shards,
- users,
- delay,
- frequency,
- last_push,
- buckets
-%% Calling mem3_sync:push/2 on every update has a measurable performance cost,
-%% so we'd like to coalesce multiple update messages from couch_event in to a
-%% single push call. Doing this while ensuring both correctness (i.e., no lost
-%% updates) and an even load profile is somewhat subtle. This implementation
-%% groups updated shards in a list of "buckets" (see bucket_shard/2) and
-%% guarantees that each shard is in no more than one bucket at a time - i.e.,
-%% any update messages received before the shard's current bucket has been
-%% pushed will be ignored - thereby reducing the frequency with which a single
-%% shard will be pushed. mem3_sync:push/2 is called on all shards in the
-%% *oldest* bucket roughly every mem3.sync_frequency milliseconds (see
-%% maybe_push_shards/1) to even out the load on mem3_sync.
-start_link() ->
- couch_event_listener:start_link(?MODULE, [], [all_dbs]).
-init(_) ->
- ok = subscribe_for_config(),
- Delay = config:get_integer("mem3", "sync_delay", 5000),
- Frequency = config:get_integer("mem3", "sync_frequency", 500),
- Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()),
- St = #state{
- nodes = mem3_sync:nodes_db(),
- shards = mem3_sync:shards_db(),
- users = mem3_sync:users_db(),
- delay = Delay,
- frequency = Frequency,
- buckets = Buckets
- },
- {ok, St}.
-terminate(_Reason, _State) ->
- ok.
-handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) ->
- Nodes = mem3:nodes(),
- Live = nodes(),
- [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
- maybe_push_shards(St);
-handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) ->
- mem3_sync:push(ShardsDb, mem3_sync:find_next_node()),
- maybe_push_shards(St);
-handle_event(UsersDb, updated, #state{users = UsersDb} = St) ->
- mem3_sync:push(UsersDb, mem3_sync:find_next_node()),
- maybe_push_shards(St);
-handle_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
- Buckets = bucket_shard(ShardName, St#state.buckets),
- maybe_push_shards(St#state{buckets=Buckets});
-handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) ->
- mem3_sync:remove_shard(ShardName),
- maybe_push_shards(St);
-handle_event(_DbName, _Event, St) ->
- maybe_push_shards(St).
-handle_cast({set_frequency, Frequency}, St) ->
- #state{delay = Delay, buckets = Buckets0} = St,
- Buckets1 = rebucket_shards(Delay, Frequency, Buckets0),
- maybe_push_shards(St#state{frequency=Frequency, buckets=Buckets1});
-handle_cast({set_delay, Delay}, St) ->
- #state{frequency = Frequency, buckets = Buckets0} = St,
- Buckets1 = rebucket_shards(Delay, Frequency, Buckets0),
- maybe_push_shards(St#state{delay=Delay, buckets=Buckets1});
-handle_cast(Msg, St) ->
- couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]),
- maybe_push_shards(St).
-handle_info(timeout, St) ->
- maybe_push_shards(St);
-handle_info({config_change, "mem3", "sync_delay", Value, _}, St) ->
- set_config(set_delay, Value, "ignoring bad value for mem3.sync_delay"),
- maybe_push_shards(St);
-handle_info({config_change, "mem3", "sync_frequency", Value, _}, St) ->
- set_config(set_frequency, Value, "ignoring bad value for mem3.sync_frequency"),
- maybe_push_shards(St);
-handle_info({gen_event_EXIT, _Handler, _Reason}, St) ->
- erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
- maybe_push_shards(St);
-handle_info(restart_config_listener, St) ->
- ok = subscribe_for_config(),
- maybe_push_shards(St);
-handle_info({get_state, Ref, Caller}, St) ->
- Caller ! {Ref, St},
- {ok, St};
-handle_info(Msg, St) ->
- couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
- maybe_push_shards(St).
-set_config(Cmd, Value, Error) ->
- try list_to_integer(Value) of
- IntegerValue ->
- couch_event_listener:cast(self(), {Cmd, IntegerValue})
- catch error:badarg ->
- couch_log:warning("~s: ~p", [Error, Value])
- end.
-bucket_shard(ShardName, [B|Bs]=Buckets0) ->
- case waiting(ShardName, Buckets0) of
- true -> Buckets0;
- false -> [sets:add_element(ShardName, B)|Bs]
- end.
-waiting(_, []) ->
- false;
-waiting(ShardName, [B|Bs]) ->
- case sets:is_element(ShardName, B) of
- true -> true;
- false -> waiting(ShardName, Bs)
- end.
-rebucket_shards(Frequency, Delay, Buckets0) ->
- case (Delay div Frequency + 1) - length(Buckets0) of
- 0 ->
- Buckets0;
- N when N < 0 ->
- %% Reduce the number of buckets by merging the last N + 1 together
- {ToMerge, [B|Buckets1]} = lists:split(abs(N), Buckets0),
- [sets:union([B|ToMerge])|Buckets1];
- M ->
- %% Extend the number of buckets by M
- lists:duplicate(M, sets:new()) ++ Buckets0
- end.
-%% To ensure that mem3_sync:push/2 is indeed called with roughly the frequency
-%% specified by #state.frequency, every message callback must return via a call
-%% to maybe_push_shards/1 rather than directly. All timing coordination - i.e.,
-%% calling mem3_sync:push/2 or setting a proper timeout to ensure that pending
-%% messages aren't dropped in case no further messages arrive - is handled here.
-maybe_push_shards(#state{last_push=undefined} = St) ->
- {ok, St#state{last_push=os:timestamp()}, St#state.frequency};
-maybe_push_shards(St) ->
- #state{frequency=Frequency, last_push=LastPush, buckets=Buckets0} = St,
- Now = os:timestamp(),
- Delta = timer:now_diff(Now, LastPush) div 1000,
- case Delta > Frequency of
- true ->
- {Buckets1, [ToPush]} = lists:split(length(Buckets0) - 1, Buckets0),
- Buckets2 = [sets:new()|Buckets1],
- %% There's no sets:map/2!
- sets:fold(
- fun(ShardName, _) -> push_shard(ShardName) end,
- undefined,
- ToPush
- ),
- {ok, St#state{last_push=Now, buckets=Buckets2}, Frequency};
- false ->
- {ok, St, Frequency - Delta}
- end.
-push_shard(ShardName) ->
- try mem3_shards:for_shard_range(ShardName) of
- Shards ->
- Live = nodes(),
- lists:foreach(
- fun(#shard{node=N}) ->
- case lists:member(N, Live) of
- true -> mem3_sync:push(ShardName, N);
- false -> ok
- end
- end,
- Shards
- )
- catch error:database_does_not_exist ->
- ok
- end.
-subscribe_for_config() ->
- config:subscribe_for_changes([
- {"mem3", "sync_delay"},
- {"mem3", "sync_frequency"}
- ]).
-setup_all() ->
- application:start(config),
- ok = meck:new(couch_event, [passthrough]),
- ok = meck:expect(couch_event, register_all, ['_'], ok),
- ok = meck:new(config_notifier, [passthrough]),
- ok = meck:expect(config_notifier, handle_event, [
- {[{'_', '_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)},
- {['_', '_'], meck:passthrough()}
- ]).
-teardown_all(_) ->
- meck:unload(),
- application:stop(config).
-setup() ->
- {ok, Pid} = ?MODULE:start_link(),
- erlang:unlink(Pid),
- wait_config_subscribed(Pid),
- Pid.
-teardown(Pid) ->
- exit(Pid, shutdown).
-subscribe_for_config_test_() ->
- {
- "Subscribe for configuration changes",
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- fun should_set_sync_delay/1,
- fun should_set_sync_frequency/1,
- fun should_restart_listener/1,
- fun should_terminate/1
- ]
- }
- }
- }.
-should_set_sync_delay(Pid) ->
- ?_test(begin
- config:set("mem3", "sync_delay", "123", false),
- wait_state(Pid, #state.delay, 123),
- ?assertMatch(#state{delay = 123}, get_state(Pid)),
- ok
- end).
-should_set_sync_frequency(Pid) ->
- ?_test(begin
- config:set("mem3", "sync_frequency", "456", false),
- wait_state(Pid, #state.frequency, 456),
- ?assertMatch(#state{frequency = 456}, get_state(Pid)),
- ok
- end).
-should_restart_listener(_Pid) ->
- ?_test(begin
- meck:reset(config_notifier),
- config:set("mem3", "sync_frequency", "error", false),
- meck:wait(config_notifier, subscribe, '_', 1000),
- ok
- end).
-should_terminate(Pid) ->
- ?_test(begin
- ?assert(is_process_alive(Pid)),
- EventMgr = whereis(config_event),
- EventMgrWasAlive = (catch is_process_alive(EventMgr)),
- Ref = erlang:monitor(process, Pid),
- RestartFun = fun() -> exit(EventMgr, kill) end,
- {_, _} = test_util:with_process_restart(config_event, RestartFun),
- ?assertNot(is_process_alive(EventMgr)),
- receive
- {'DOWN', Ref, _, _, _} ->
- ok
- after 1000 ->
- ?debugFmt("~n XKCD should_terminate EventMgrWasAlive:~p MsgQueue:~p PInfo:~p ~n", [
- EventMgrWasAlive, process_info(self(), messages), process_info(Pid)
- ]),
- ?assert(false)
- end,
- ?assert(is_process_alive(whereis(config_event))),
- ok
- end).
-get_state(Pid) ->
- Ref = make_ref(),
- Pid ! {get_state, Ref, self()},
- receive
- {Ref, State} -> State
- after 500 ->
- timeout
- end.
-wait_state(Pid, Field, Val) when is_pid(Pid), is_integer(Field) ->
- WaitFun = fun() ->
- case get_state(Pid) of
- #state{} = S when element(Field, S) == Val ->
- true;
- _ ->
- wait
- end
- end,
- test_util:wait(WaitFun).
-wait_config_subscribed(Pid) ->
- WaitFun = fun() ->
- Handlers = gen_event:which_handlers(config_event),
- Pids = [Id || {config_notifier, Id} <- Handlers],
- case lists:member(Pid, Pids) of
- true -> true;
- false -> wait
- end
- end,
- test_util:wait(WaitFun).