diff options
author | Jan Lehnardt <jan@apache.org> | 2017-10-07 17:04:54 +0200 |
---|---|---|
committer | Joan Touzet <joant@atypical.net> | 2017-10-10 17:23:37 -0400 |
commit | 599dcb32a7609359aa7e170d7ef8e95dd539b10d (patch) | |
tree | 14f216a20f1008ad49d1d38c42b68ca4aac5d282 | |
parent | 7da9d8bb7e0b54aad9ddc17681f71c4cf7ec66d7 (diff) | |
download | couchdb-599dcb32a7609359aa7e170d7ef8e95dd539b10d.tar.gz |
Ensure a user creation is handlined on one node only
This patch makes use of the mechanism that ensures that replications
are only run on one node.
When the cluster has nodes added/removed all changes listeners are
restarted.
-rw-r--r-- | src/couch_peruser/src/couch_peruser.app.src | 2 | ||||
-rw-r--r-- | src/couch_peruser/src/couch_peruser.erl | 158 |
2 files changed, 110 insertions, 50 deletions
diff --git a/src/couch_peruser/src/couch_peruser.app.src b/src/couch_peruser/src/couch_peruser.app.src index 777446d94..42b7b25b2 100644 --- a/src/couch_peruser/src/couch_peruser.app.src +++ b/src/couch_peruser/src/couch_peruser.app.src @@ -14,7 +14,7 @@ {description, "couch_peruser - maintains per-user databases in CouchDB"}, {vsn, git}, {registered, []}, - {applications, [kernel, stdlib, config, couch, fabric]}, + {applications, [kernel, stdlib, config, couch, fabric, couch_replicator, mem3]}, {mod, {couch_peruser_app, []}}, {env, []}, {modules, [couch_peruser, couch_peruser_app, couch_peruser_sup]} diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl index 63ef084ce..9161f56e0 100644 --- a/src/couch_peruser/src/couch_peruser.erl +++ b/src/couch_peruser/src/couch_peruser.erl @@ -22,6 +22,9 @@ -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +% cluster state notification callback +-export([notify_cluster_event/2]). + -export([init_changes_handler/1, changes_handler/3]). -record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}). @@ -34,10 +37,13 @@ start_link() -> gen_server:start_link(?MODULE, [], []). init() -> + couch_log:debug("peruser: starting on node ~p", [node()]), case config:get_boolean("couch_peruser", "enable", false) of false -> + couch_log:debug("peruser: disabled on node ~p", [node()]), #clusterState{}; true -> + couch_log:debug("peruser: enabled on node ~p", [node()]), DbName = ?l2b(config:get( "couch_httpd_auth", "authentication_db", "_users")), DeleteDbs = config:get_boolean("couch_peruser", "delete_dbs", false), @@ -47,21 +53,37 @@ init() -> db_name = DbName, delete_dbs = DeleteDbs }, - try - States = lists:map(fun (A) -> - S = #state{parent = ClusterState#clusterState.parent, - db_name = A#shard.name, - delete_dbs = DeleteDbs}, - {Pid, Ref} = spawn_opt( - ?MODULE, init_changes_handler, [S], [link, monitor]), - S#state{changes_pid=Pid, changes_ref=Ref} - end, mem3:local_shards(DbName)), - - ClusterState#clusterState{states = States} - catch error:database_does_not_exist -> - couch_log:warning("couch_peruser can't proceed as underlying database (~s) is missing, disables itself.", [DbName]), - config:set("couch_peruser", "enable", "false", lists:concat([binary_to_list(DbName), " is missing"])) - end + + % set up cluster-stable listener + couch_replicator_clustering:link_cluster_event_listener(?MODULE, + notify_cluster_event, [self()]), + + couch_log:debug("peruser: registered for cluster event on node ~p", [node()]), + ClusterState + end. + +% Cluster membership change notification callback +-spec notify_cluster_event(pid(), {cluster, any()}) -> ok. +notify_cluster_event(Server, {cluster, _} = Event) -> + couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]), + gen_server:cast(Server, Event). + +start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterState) -> + couch_log:debug("peruser: start_listening() on node ~p", [node()]), + try + States = lists:map(fun (A) -> + S = #state{parent = ClusterState#clusterState.parent, + db_name = A#shard.name, + delete_dbs = DeleteDbs}, + {Pid, Ref} = spawn_opt( + ?MODULE, init_changes_handler, [S], [link, monitor]), + S#state{changes_pid=Pid, changes_ref=Ref} + end, mem3:local_shards(DbName)), + + ClusterState#clusterState{states = States} + catch error:database_does_not_exist -> + couch_log:warning("couch_peruser can't proceed as underlying database (~s) is missing, disables itself.", [DbName]), + config:set("couch_peruser", "enable", "false", lists:concat([binary_to_list(DbName), " is missing"])) end. init_changes_handler(#state{db_name=DbName} = State) -> @@ -76,24 +98,30 @@ init_changes_handler(#state{db_name=DbName} = State) -> ok end. -changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{}) -> + +changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) -> case couch_util:get_value(<<"id">>, Doc) of - <<"org.couchdb.user:",User/binary>> -> - case couch_util:get_value(<<"deleted">>, Doc, false) of - false -> - UserDb = ensure_user_db(User), - ok = ensure_security(User, UserDb, fun add_user/3), - State; + <<"org.couchdb.user:",User/binary>>=DocId -> + case should_handle_doc(DbName, DocId) of true -> - case State#state.delete_dbs of - true -> - _UserDb = delete_user_db(User), - State; + case couch_util:get_value(<<"deleted">>, Doc, false) of false -> - UserDb = user_db_name(User), - ok = ensure_security(User, UserDb, fun remove_user/3), - State - end + UserDb = ensure_user_db(User), + ok = ensure_security(User, UserDb, fun add_user/3), + State; + true -> + case State#state.delete_dbs of + true -> + _UserDb = delete_user_db(User), + State; + false -> + UserDb = user_db_name(User), + ok = ensure_security(User, UserDb, fun remove_user/3), + State + end + end; + false -> + State end; _ -> State @@ -101,6 +129,25 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{}) -> changes_handler(_Event, _ResType, State) -> State. +should_handle_doc(DbName, DocId) -> + case couch_replicator_clustering:owner(DbName, DocId) of + unstable -> + % todo: when we do proper resume[1], we can return false here + % and rely on a module restart when the cluster is stable again + % in the meantime, we risk conflicts when the cluster gets unstable + % and users are being created. + % [1] https://github.com/apache/couchdb/issues/872 + true; + ThisNode when ThisNode =:= node() -> + couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]), + % do the deed + true; + _OtherNode -> + couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]), + false + end. + + delete_user_db(User) -> UserDb = user_db_name(User), try @@ -158,20 +205,25 @@ remove_user(User, Prop, {Modified, SecProps}) -> end. ensure_security(User, UserDb, TransformFun) -> - {ok, Shards} = fabric:get_all_security(UserDb, [?ADMIN_CTX]), - {_ShardInfo, {SecProps}} = hd(Shards), - % assert that shards have the same security object - true = lists:all(fun ({_, {SecProps1}}) -> - SecProps =:= SecProps1 - end, Shards), - case lists:foldl( - fun (Prop, SAcc) -> TransformFun(User, Prop, SAcc) end, - {false, SecProps}, - [<<"admins">>, <<"members">>]) of - {false, _} -> - ok; - {true, SecProps1} -> - ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX]) + case fabric:get_all_security(UserDb, [?ADMIN_CTX]) of + {error, no_majority} -> + % single node, ignore + ok; + {ok, Shards} -> + {_ShardInfo, {SecProps}} = hd(Shards), + % assert that shards have the same security object + true = lists:all(fun ({_, {SecProps1}}) -> + SecProps =:= SecProps1 + end, Shards), + case lists:foldl( + fun (Prop, SAcc) -> TransformFun(User, Prop, SAcc) end, + {false, SecProps}, + [<<"admins">>, <<"members">>]) of + {false, _} -> + ok; + {true, SecProps1} -> + ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX]) + end end. user_db_name(User) -> @@ -179,6 +231,11 @@ user_db_name(User) -> [string:to_lower(integer_to_list(X, 16)) || <<X>> <= User]), <<?USERDB_PREFIX,HexUser/binary>>. +exit_changes(ClusterState) -> + lists:foreach(fun (State) -> + demonitor(State#state.changes_ref, [flush]), + exit(State#state.changes_pid, kill) + end, ClusterState#clusterState.states). %% gen_server callbacks @@ -191,16 +248,19 @@ handle_call(_Msg, _From, State) -> handle_cast(update_config, ClusterState) when ClusterState#clusterState.states =/= undefined -> - lists:foreach(fun (State) -> - demonitor(State#state.changes_ref, [flush]), - exit(State#state.changes_pid, kill) - end, ClusterState#clusterState.states), - + exit_changes(ClusterState), {noreply, init()}; handle_cast(update_config, _) -> {noreply, init()}; handle_cast(stop, State) -> {stop, normal, State}; +handle_cast({cluster, unstable}, ClusterState) when ClusterState#clusterState.states =/= undefined -> + exit_changes(ClusterState), + {noreply, init()}; +handle_cast({cluster, unstable}, _) -> + {noreply, init()}; +handle_cast({cluster, stable}, State) -> + {noreply, start_listening(State)}; handle_cast(_Msg, State) -> {noreply, State}. |