% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(mem3_sync). -behaviour(gen_server). -vsn(1). -export([ init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3 ]). -export([ start_link/0, get_active/0, get_queue/0, push/1, push/2, remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, shards_db/0, users_db/0, find_next_node/0 ]). -export([ local_dbs/0 ]). -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -record(state, { active = [], count = 0, limit, dict = dict:new(), waiting = queue:new() }). -record(job, {name, node, count = nil, pid = nil}). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). get_active() -> gen_server:call(?MODULE, get_active). get_queue() -> gen_server:call(?MODULE, get_queue). get_backlog() -> gen_server:call(?MODULE, get_backlog). push(#shard{name = Name}, Target) -> push(Name, Target); push(Name, #shard{node = Node}) -> push(Name, Node); push(Name, Node) -> push(#job{name = Name, node = Node}). push(#job{node = Node} = Job) when Node =/= node() -> gen_server:cast(?MODULE, {push, Job}); push(_) -> ok. remove_node(Node) -> gen_server:cast(?MODULE, {remove_node, Node}). remove_shard(Shard) -> gen_server:cast(?MODULE, {remove_shard, Shard}). init([]) -> process_flag(trap_exit, true), Concurrency = config:get("mem3", "sync_concurrency", "10"), gen_event:add_handler(mem3_events, mem3_sync_event, []), initial_sync(), {ok, #state{limit = list_to_integer(Concurrency)}}. handle_call({push, Job}, From, State) -> handle_cast({push, Job#job{pid = From}}, State); handle_call(get_active, _From, State) -> {reply, State#state.active, State}; handle_call(get_queue, _From, State) -> {reply, to_list(State#state.waiting), State}; handle_call(get_backlog, _From, #state{active = A, waiting = WQ} = State) -> CA = lists:sum([C || #job{count = C} <- A, is_integer(C)]), CW = lists:sum([C || #job{count = C} <- to_list(WQ), is_integer(C)]), {reply, CA + CW, State}. handle_cast({push, DbName, Node}, State) -> handle_cast({push, #job{name = DbName, node = Node}}, State); handle_cast({push, Job}, #state{count = Count, limit = Limit} = State) when Count >= Limit -> {noreply, add_to_queue(State, Job)}; handle_cast({push, Job}, State) -> #state{active = L, count = C} = State, #job{name = DbName, node = Node} = Job, case is_running(DbName, Node, L) of true -> {noreply, add_to_queue(State, Job)}; false -> Pid = start_push_replication(Job), {noreply, State#state{active = [Job#job{pid = Pid} | L], count = C + 1}} end; handle_cast({remove_node, Node}, #state{waiting = W0} = State) -> {Alive, Dead} = lists:partition(fun(#job{node = N}) -> N =/= Node end, to_list(W0)), Dict = remove_entries(State#state.dict, Dead), [ exit(Pid, die_now) || #job{node = N, pid = Pid} <- State#state.active, N =:= Node ], {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}; handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> {Alive, Dead} = lists:partition( fun(#job{name = S}) -> S =/= Shard end, to_list(W0) ), Dict = remove_entries(State#state.dict, Dead), [ exit(Pid, die_now) || #job{name = S, pid = Pid} <- State#state.active, S =:= Shard ], {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}. handle_info({'EXIT', Active, normal}, State) -> handle_replication_exit(State, Active); handle_info({'EXIT', Active, die_now}, State) -> % we forced this one ourselves, do not retry handle_replication_exit(State, Active); handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) -> % target doesn't exist, do not retry handle_replication_exit(State, Active); handle_info({'EXIT', Active, Reason}, State) -> NewState = case lists:keyfind(Active, #job.pid, State#state.active) of #job{name = OldDbName, node = OldNode} = Job -> couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]), case Reason of {pending_changes, Count} -> maybe_resubmit(State, Job#job{pid = nil, count = Count}); _ -> case mem3:db_is_current(Job#job.name) of true -> timer:apply_after(5000, ?MODULE, push, [Job#job{pid = nil}]); false -> % no need to retry (db deleted or recreated) ok end, State end; false -> State end, handle_replication_exit(NewState, Active); handle_info(Msg, State) -> couch_log:notice("unexpected msg at replication manager ~p", [Msg]), {noreply, State}. terminate(_Reason, State) -> [exit(Pid, shutdown) || #job{pid = Pid} <- State#state.active], ok. code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) -> {ok, State#state{waiting = from_list(WaitingList)}}; code_change(_, State, _) -> {ok, State}. maybe_resubmit(State, #job{name = DbName, node = Node} = Job) -> case lists:member(DbName, local_dbs()) of true -> case find_next_node() of Node -> add_to_queue(State, Job); _ -> % don't resubmit b/c we have a new replication target State end; false -> add_to_queue(State, Job) end. handle_replication_exit(State, Pid) -> #state{active = Active, limit = Limit, dict = D, waiting = Waiting} = State, Active1 = lists:keydelete(Pid, #job.pid, Active), case is_empty(Waiting) of true -> {noreply, State#state{active = Active1, count = length(Active1)}}; _ -> Count = length(Active1), NewState = if Count < Limit -> case next_replication(Active1, Waiting, queue:new()) of % all waiting replications are also active nil -> State#state{active = Active1, count = Count}; {#job{name = DbName, node = Node} = Job, StillWaiting} -> NewPid = start_push_replication(Job), State#state{ active = [Job#job{pid = NewPid} | Active1], count = Count + 1, dict = dict:erase({DbName, Node}, D), waiting = StillWaiting } end; true -> State#state{active = Active1, count = Count} end, {noreply, NewState} end. start_push_replication(#job{name = Name, node = Node, pid = From}) -> if From =/= nil -> gen_server:reply(From, ok); true -> ok end, spawn_link(fun() -> case mem3_rep:go(Name, maybe_redirect(Node)) of {ok, Pending} when Pending > 0 -> exit({pending_changes, Pending}); _ -> ok end end). add_to_queue(State, #job{name = DbName, node = Node, pid = From} = Job) -> #state{dict = D, waiting = WQ} = State, case dict:is_key({DbName, Node}, D) of true -> if From =/= nil -> gen_server:reply(From, ok); true -> ok end, State; false -> couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]), State#state{ dict = dict:store({DbName, Node}, ok, D), waiting = in(Job, WQ) } end. sync_nodes_and_dbs() -> Node = find_next_node(), [push(Db, Node) || Db <- local_dbs()]. initial_sync() -> mem3_sync_nodes:add(nodes()). initial_sync(Live) -> sync_nodes_and_dbs(), Acc = {node(), Live, []}, {_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc), submit_replication_tasks(node(), Live, Shards). initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) -> case AccShards of [#shard{dbname = AccDb} | _] when Db =/= AccDb -> submit_replication_tasks(LocalNode, Live, AccShards), {LocalNode, Live, [Shard]}; _ -> {LocalNode, Live, [Shard | AccShards]} end. submit_replication_tasks(LocalNode, Live, Shards) -> SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end, {Local, Remote} = lists:partition(SplitFun, Shards), lists:foreach( fun(#shard{name = ShardName}) -> [ sync_push(ShardName, N) || #shard{node = N, name = Name} <- Remote, Name =:= ShardName, lists:member(N, Live) ] end, Local ). sync_push(ShardName, N) -> gen_server:call(mem3_sync, {push, #job{name = ShardName, node = N}}, infinity). find_next_node() -> LiveNodes = [node() | nodes()], AllNodes0 = lists:sort(mem3:nodes()), AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)], AllNodes = AllNodes1 ++ [hd(AllNodes1)], [_Self, Next | _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes), Next. %% @doc Finds the next {DbName,Node} pair in the list of waiting replications %% which does not correspond to an already running replication -spec next_replication([#job{}], queue:queue(_), queue:queue(_)) -> {#job{}, queue:queue(_)} | nil. next_replication(Active, Waiting, WaitingAndRunning) -> case is_empty(Waiting) of true -> nil; false -> {{value, #job{name = S, node = N} = Job}, RemQ} = out(Waiting), case is_running(S, N, Active) of true -> next_replication(Active, RemQ, in(Job, WaitingAndRunning)); false -> {Job, join(RemQ, WaitingAndRunning)} end end. is_running(DbName, Node, ActiveList) -> [] =/= [true || #job{name = S, node = N} <- ActiveList, S =:= DbName, N =:= Node]. remove_entries(Dict, Entries) -> lists:foldl( fun(#job{name = S, node = N}, D) -> dict:erase({S, N}, D) end, Dict, Entries ). local_dbs() -> UsersDb = users_db(), % users db might not have been created so don't include it unless it exists case couch_server:exists(UsersDb) of true -> [nodes_db(), shards_db(), UsersDb]; false -> [nodes_db(), shards_db()] end. nodes_db() -> ?l2b(config:get("mem3", "nodes_db", "_nodes")). shards_db() -> ?l2b(config:get("mem3", "shards_db", "_dbs")). users_db() -> ?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")). maybe_redirect(Node) -> case config:get("mem3.redirects", atom_to_list(Node)) of undefined -> Node; Redirect -> couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]), list_to_existing_atom(Redirect) end.