summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2017-10-08 15:36:31 +0200
committerJoan Touzet <joant@atypical.net>2017-10-10 17:23:37 -0400
commitf0c28835a6a8d4b08de4eaf94b7d07d2d4865e8c (patch)
tree643682da76c66080c1795e8a27479ead30e8ad15
parente34d0484c561600162f714d251decbea73abd423 (diff)
downloadcouchdb-f0c28835a6a8d4b08de4eaf94b7d07d2d4865e8c.tar.gz
make sure peruser listeners are only initialised once per node
-rw-r--r--src/couch_peruser/src/couch_peruser.erl59
1 files changed, 36 insertions, 23 deletions
diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl
index 791431c4a..e722b7ec4 100644
--- a/src/couch_peruser/src/couch_peruser.erl
+++ b/src/couch_peruser/src/couch_peruser.erl
@@ -32,7 +32,14 @@
cluster_unstable/1
]).
--record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}).
+-record(state, {
+ parent,
+ db_name,
+ delete_dbs,
+ changes_pid,
+ changes_ref
+}).
+
-record(clusterState, {
parent,
db_name,
@@ -47,6 +54,9 @@
-define(DEFAULT_QUIET_PERIOD, 60). % seconds
-define(DEFAULT_START_PERIOD, 5). % seconds
+%%
+%% Please leave in the commented-out couch_log:debug calls, thanks! — Jan
+%%
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -72,8 +82,6 @@ init() ->
{ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
Period),
- couch_log:debug("peruser: registered for cluster event on node ~p", [node()]),
-
#clusterState{
parent = self(),
db_name = DbName,
@@ -86,11 +94,14 @@ init() ->
% Cluster membership change notification callback
-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
notify_cluster_event(Server, {cluster, _} = Event) ->
- couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]),
+ % couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]),
gen_server:cast(Server, Event).
+start_listening(#clusterState{states=States}=ClusterState) when length(States) > 0 ->
+ % couch_log:debug("peruser: start_listening() already run on node ~p in pid ~p", [node(), self()]),
+ ClusterState;
start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterState) ->
- couch_log:debug("peruser: start_listening() on node ~p", [node()]),
+ % couch_log:debug("peruser: start_listening() on node ~p", [node()]),
try
States = lists:map(fun (A) ->
S = #state{parent = ClusterState#clusterState.parent,
@@ -100,6 +111,7 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta
?MODULE, init_changes_handler, [S], [link, monitor]),
S#state{changes_pid=Pid, changes_ref=Ref}
end, mem3:local_shards(DbName)),
+ % couch_log:debug("peruser: start_listening() States ~p", [States]),
ClusterState#clusterState{states = States, cluster_stable = true}
catch error:database_does_not_exist ->
@@ -108,7 +120,6 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta
end.
init_changes_handler(#state{db_name=DbName} = State) ->
- % leave for debugging
% couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]),
try
{ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]),
@@ -123,7 +134,6 @@ init_changes_handler(#state{db_name=DbName} = State) ->
changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) ->
- % leave for debugging
% couch_log:debug("peruser: changes_handler() on DbName/Doc ~p/~p", [DbName, Doc]),
case couch_util:get_value(<<"id">>, Doc) of
@@ -155,28 +165,31 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName
changes_handler(_Event, _ResType, State) ->
State.
+
should_handle_doc(ShardName, DocId) ->
- should_handle_doc_int(ShardName, DocId, is_stable()).
-
-should_handle_doc_int(ShardName, DocId, false) ->
- % when the cluster is unstable, we have already stopped all Listeners
- % the next stable event will restart all listeners and pick up this
- % doc change
- couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]),
- false;
-should_handle_doc_int(ShardName, DocId, true) ->
+ case is_stable() of
+ false ->
+ % when the cluster is unstable, we have already stopped all Listeners
+ % the next stable event will restart all listeners and pick up this
+ % doc change
+ couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]),
+ false;
+ true ->
+ should_handle_doc_int(ShardName, DocId)
+ end.
+
+should_handle_doc_int(ShardName, DocId) ->
DbName = mem3:dbname(ShardName),
Live = [erlang:node() | erlang:nodes()],
Shards = mem3:shards(DbName, DocId),
Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
case mem3:owner(DbName, DocId, Nodes) of
- ThisNode when ThisNode =:= node() ->
- couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
- % do the deed
- true;
- _OtherNode ->
- couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
- false
+ ThisNode when ThisNode =:= node() ->
+ couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
+ true; % do the database action
+ _OtherNode ->
+ couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
+ false
end.