diff options
author | Jan Lehnardt <jan@apache.org> | 2017-10-08 15:36:31 +0200 |
---|---|---|
committer | Joan Touzet <joant@atypical.net> | 2017-10-10 17:23:37 -0400 |
commit | f0c28835a6a8d4b08de4eaf94b7d07d2d4865e8c (patch) | |
tree | 643682da76c66080c1795e8a27479ead30e8ad15 | |
parent | e34d0484c561600162f714d251decbea73abd423 (diff) | |
download | couchdb-f0c28835a6a8d4b08de4eaf94b7d07d2d4865e8c.tar.gz |
make sure peruser listeners are only initialised once per node
-rw-r--r-- | src/couch_peruser/src/couch_peruser.erl | 59 |
1 files changed, 36 insertions, 23 deletions
diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl index 791431c4a..e722b7ec4 100644 --- a/src/couch_peruser/src/couch_peruser.erl +++ b/src/couch_peruser/src/couch_peruser.erl @@ -32,7 +32,14 @@ cluster_unstable/1 ]). --record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}). +-record(state, { + parent, + db_name, + delete_dbs, + changes_pid, + changes_ref +}). + -record(clusterState, { parent, db_name, @@ -47,6 +54,9 @@ -define(DEFAULT_QUIET_PERIOD, 60). % seconds -define(DEFAULT_START_PERIOD, 5). % seconds +%% +%% Please leave in the commented-out couch_log:debug calls, thanks! — Jan +%% start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -72,8 +82,6 @@ init() -> {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod, Period), - couch_log:debug("peruser: registered for cluster event on node ~p", [node()]), - #clusterState{ parent = self(), db_name = DbName, @@ -86,11 +94,14 @@ init() -> % Cluster membership change notification callback -spec notify_cluster_event(pid(), {cluster, any()}) -> ok. notify_cluster_event(Server, {cluster, _} = Event) -> - couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]), + % couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]), gen_server:cast(Server, Event). +start_listening(#clusterState{states=States}=ClusterState) when length(States) > 0 -> + % couch_log:debug("peruser: start_listening() already run on node ~p in pid ~p", [node(), self()]), + ClusterState; start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterState) -> - couch_log:debug("peruser: start_listening() on node ~p", [node()]), + % couch_log:debug("peruser: start_listening() on node ~p", [node()]), try States = lists:map(fun (A) -> S = #state{parent = ClusterState#clusterState.parent, @@ -100,6 +111,7 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta ?MODULE, init_changes_handler, [S], [link, monitor]), S#state{changes_pid=Pid, changes_ref=Ref} end, mem3:local_shards(DbName)), + % couch_log:debug("peruser: start_listening() States ~p", [States]), ClusterState#clusterState{states = States, cluster_stable = true} catch error:database_does_not_exist -> @@ -108,7 +120,6 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta end. init_changes_handler(#state{db_name=DbName} = State) -> - % leave for debugging % couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]), try {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]), @@ -123,7 +134,6 @@ init_changes_handler(#state{db_name=DbName} = State) -> changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) -> - % leave for debugging % couch_log:debug("peruser: changes_handler() on DbName/Doc ~p/~p", [DbName, Doc]), case couch_util:get_value(<<"id">>, Doc) of @@ -155,28 +165,31 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName changes_handler(_Event, _ResType, State) -> State. + should_handle_doc(ShardName, DocId) -> - should_handle_doc_int(ShardName, DocId, is_stable()). - -should_handle_doc_int(ShardName, DocId, false) -> - % when the cluster is unstable, we have already stopped all Listeners - % the next stable event will restart all listeners and pick up this - % doc change - couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]), - false; -should_handle_doc_int(ShardName, DocId, true) -> + case is_stable() of + false -> + % when the cluster is unstable, we have already stopped all Listeners + % the next stable event will restart all listeners and pick up this + % doc change + couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]), + false; + true -> + should_handle_doc_int(ShardName, DocId) + end. + +should_handle_doc_int(ShardName, DocId) -> DbName = mem3:dbname(ShardName), Live = [erlang:node() | erlang:nodes()], Shards = mem3:shards(DbName, DocId), Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)], case mem3:owner(DbName, DocId, Nodes) of - ThisNode when ThisNode =:= node() -> - couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]), - % do the deed - true; - _OtherNode -> - couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]), - false + ThisNode when ThisNode =:= node() -> + couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]), + true; % do the database action + _OtherNode -> + couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]), + false end. |