summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2017-10-07 17:04:54 +0200
committerJoan Touzet <joant@atypical.net>2017-10-10 17:23:37 -0400
commit599dcb32a7609359aa7e170d7ef8e95dd539b10d (patch)
tree14f216a20f1008ad49d1d38c42b68ca4aac5d282
parent7da9d8bb7e0b54aad9ddc17681f71c4cf7ec66d7 (diff)
downloadcouchdb-599dcb32a7609359aa7e170d7ef8e95dd539b10d.tar.gz
Ensure a user creation is handlined on one node only
This patch makes use of the mechanism that ensures that replications are only run on one node. When the cluster has nodes added/removed all changes listeners are restarted.
-rw-r--r--src/couch_peruser/src/couch_peruser.app.src2
-rw-r--r--src/couch_peruser/src/couch_peruser.erl158
2 files changed, 110 insertions, 50 deletions
diff --git a/src/couch_peruser/src/couch_peruser.app.src b/src/couch_peruser/src/couch_peruser.app.src
index 777446d94..42b7b25b2 100644
--- a/src/couch_peruser/src/couch_peruser.app.src
+++ b/src/couch_peruser/src/couch_peruser.app.src
@@ -14,7 +14,7 @@
{description, "couch_peruser - maintains per-user databases in CouchDB"},
{vsn, git},
{registered, []},
- {applications, [kernel, stdlib, config, couch, fabric]},
+ {applications, [kernel, stdlib, config, couch, fabric, couch_replicator, mem3]},
{mod, {couch_peruser_app, []}},
{env, []},
{modules, [couch_peruser, couch_peruser_app, couch_peruser_sup]}
diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl
index 63ef084ce..9161f56e0 100644
--- a/src/couch_peruser/src/couch_peruser.erl
+++ b/src/couch_peruser/src/couch_peruser.erl
@@ -22,6 +22,9 @@
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+% cluster state notification callback
+-export([notify_cluster_event/2]).
+
-export([init_changes_handler/1, changes_handler/3]).
-record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}).
@@ -34,10 +37,13 @@ start_link() ->
gen_server:start_link(?MODULE, [], []).
init() ->
+ couch_log:debug("peruser: starting on node ~p", [node()]),
case config:get_boolean("couch_peruser", "enable", false) of
false ->
+ couch_log:debug("peruser: disabled on node ~p", [node()]),
#clusterState{};
true ->
+ couch_log:debug("peruser: enabled on node ~p", [node()]),
DbName = ?l2b(config:get(
"couch_httpd_auth", "authentication_db", "_users")),
DeleteDbs = config:get_boolean("couch_peruser", "delete_dbs", false),
@@ -47,21 +53,37 @@ init() ->
db_name = DbName,
delete_dbs = DeleteDbs
},
- try
- States = lists:map(fun (A) ->
- S = #state{parent = ClusterState#clusterState.parent,
- db_name = A#shard.name,
- delete_dbs = DeleteDbs},
- {Pid, Ref} = spawn_opt(
- ?MODULE, init_changes_handler, [S], [link, monitor]),
- S#state{changes_pid=Pid, changes_ref=Ref}
- end, mem3:local_shards(DbName)),
-
- ClusterState#clusterState{states = States}
- catch error:database_does_not_exist ->
- couch_log:warning("couch_peruser can't proceed as underlying database (~s) is missing, disables itself.", [DbName]),
- config:set("couch_peruser", "enable", "false", lists:concat([binary_to_list(DbName), " is missing"]))
- end
+
+ % set up cluster-stable listener
+ couch_replicator_clustering:link_cluster_event_listener(?MODULE,
+ notify_cluster_event, [self()]),
+
+ couch_log:debug("peruser: registered for cluster event on node ~p", [node()]),
+ ClusterState
+ end.
+
+% Cluster membership change notification callback
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+ couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]),
+ gen_server:cast(Server, Event).
+
+start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterState) ->
+ couch_log:debug("peruser: start_listening() on node ~p", [node()]),
+ try
+ States = lists:map(fun (A) ->
+ S = #state{parent = ClusterState#clusterState.parent,
+ db_name = A#shard.name,
+ delete_dbs = DeleteDbs},
+ {Pid, Ref} = spawn_opt(
+ ?MODULE, init_changes_handler, [S], [link, monitor]),
+ S#state{changes_pid=Pid, changes_ref=Ref}
+ end, mem3:local_shards(DbName)),
+
+ ClusterState#clusterState{states = States}
+ catch error:database_does_not_exist ->
+ couch_log:warning("couch_peruser can't proceed as underlying database (~s) is missing, disables itself.", [DbName]),
+ config:set("couch_peruser", "enable", "false", lists:concat([binary_to_list(DbName), " is missing"]))
end.
init_changes_handler(#state{db_name=DbName} = State) ->
@@ -76,24 +98,30 @@ init_changes_handler(#state{db_name=DbName} = State) ->
ok
end.
-changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{}) ->
+
+changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) ->
case couch_util:get_value(<<"id">>, Doc) of
- <<"org.couchdb.user:",User/binary>> ->
- case couch_util:get_value(<<"deleted">>, Doc, false) of
- false ->
- UserDb = ensure_user_db(User),
- ok = ensure_security(User, UserDb, fun add_user/3),
- State;
+ <<"org.couchdb.user:",User/binary>>=DocId ->
+ case should_handle_doc(DbName, DocId) of
true ->
- case State#state.delete_dbs of
- true ->
- _UserDb = delete_user_db(User),
- State;
+ case couch_util:get_value(<<"deleted">>, Doc, false) of
false ->
- UserDb = user_db_name(User),
- ok = ensure_security(User, UserDb, fun remove_user/3),
- State
- end
+ UserDb = ensure_user_db(User),
+ ok = ensure_security(User, UserDb, fun add_user/3),
+ State;
+ true ->
+ case State#state.delete_dbs of
+ true ->
+ _UserDb = delete_user_db(User),
+ State;
+ false ->
+ UserDb = user_db_name(User),
+ ok = ensure_security(User, UserDb, fun remove_user/3),
+ State
+ end
+ end;
+ false ->
+ State
end;
_ ->
State
@@ -101,6 +129,25 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{}) ->
changes_handler(_Event, _ResType, State) ->
State.
+should_handle_doc(DbName, DocId) ->
+ case couch_replicator_clustering:owner(DbName, DocId) of
+ unstable ->
+ % todo: when we do proper resume[1], we can return false here
+ % and rely on a module restart when the cluster is stable again
+ % in the meantime, we risk conflicts when the cluster gets unstable
+ % and users are being created.
+ % [1] https://github.com/apache/couchdb/issues/872
+ true;
+ ThisNode when ThisNode =:= node() ->
+ couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
+ % do the deed
+ true;
+ _OtherNode ->
+ couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
+ false
+ end.
+
+
delete_user_db(User) ->
UserDb = user_db_name(User),
try
@@ -158,20 +205,25 @@ remove_user(User, Prop, {Modified, SecProps}) ->
end.
ensure_security(User, UserDb, TransformFun) ->
- {ok, Shards} = fabric:get_all_security(UserDb, [?ADMIN_CTX]),
- {_ShardInfo, {SecProps}} = hd(Shards),
- % assert that shards have the same security object
- true = lists:all(fun ({_, {SecProps1}}) ->
- SecProps =:= SecProps1
- end, Shards),
- case lists:foldl(
- fun (Prop, SAcc) -> TransformFun(User, Prop, SAcc) end,
- {false, SecProps},
- [<<"admins">>, <<"members">>]) of
- {false, _} ->
- ok;
- {true, SecProps1} ->
- ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX])
+ case fabric:get_all_security(UserDb, [?ADMIN_CTX]) of
+ {error, no_majority} ->
+ % single node, ignore
+ ok;
+ {ok, Shards} ->
+ {_ShardInfo, {SecProps}} = hd(Shards),
+ % assert that shards have the same security object
+ true = lists:all(fun ({_, {SecProps1}}) ->
+ SecProps =:= SecProps1
+ end, Shards),
+ case lists:foldl(
+ fun (Prop, SAcc) -> TransformFun(User, Prop, SAcc) end,
+ {false, SecProps},
+ [<<"admins">>, <<"members">>]) of
+ {false, _} ->
+ ok;
+ {true, SecProps1} ->
+ ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX])
+ end
end.
user_db_name(User) ->
@@ -179,6 +231,11 @@ user_db_name(User) ->
[string:to_lower(integer_to_list(X, 16)) || <<X>> <= User]),
<<?USERDB_PREFIX,HexUser/binary>>.
+exit_changes(ClusterState) ->
+ lists:foreach(fun (State) ->
+ demonitor(State#state.changes_ref, [flush]),
+ exit(State#state.changes_pid, kill)
+ end, ClusterState#clusterState.states).
%% gen_server callbacks
@@ -191,16 +248,19 @@ handle_call(_Msg, _From, State) ->
handle_cast(update_config, ClusterState) when ClusterState#clusterState.states =/= undefined ->
- lists:foreach(fun (State) ->
- demonitor(State#state.changes_ref, [flush]),
- exit(State#state.changes_pid, kill)
- end, ClusterState#clusterState.states),
-
+ exit_changes(ClusterState),
{noreply, init()};
handle_cast(update_config, _) ->
{noreply, init()};
handle_cast(stop, State) ->
{stop, normal, State};
+handle_cast({cluster, unstable}, ClusterState) when ClusterState#clusterState.states =/= undefined ->
+ exit_changes(ClusterState),
+ {noreply, init()};
+handle_cast({cluster, unstable}, _) ->
+ {noreply, init()};
+handle_cast({cluster, stable}, State) ->
+ {noreply, start_listening(State)};
handle_cast(_Msg, State) ->
{noreply, State}.