diff options
Diffstat (limited to 'src/mem3/src/mem3_sync.erl')
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 323 |
1 files changed, 0 insertions, 323 deletions
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl deleted file mode 100644 index 8170f3c1a..000000000 --- a/src/mem3/src/mem3_sync.erl +++ /dev/null @@ -1,323 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_sync). --behaviour(gen_server). --vsn(1). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --export([start_link/0, get_active/0, get_queue/0, push/1, push/2, - remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, - shards_db/0, users_db/0, find_next_node/0]). --export([ - local_dbs/0 -]). - --import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). - --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record(state, { - active = [], - count = 0, - limit, - dict = dict:new(), - waiting = queue:new() -}). - --record(job, {name, node, count=nil, pid=nil}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -get_active() -> - gen_server:call(?MODULE, get_active). - -get_queue() -> - gen_server:call(?MODULE, get_queue). - -get_backlog() -> - gen_server:call(?MODULE, get_backlog). - -push(#shard{name = Name}, Target) -> - push(Name, Target); -push(Name, #shard{node=Node}) -> - push(Name, Node); -push(Name, Node) -> - push(#job{name = Name, node = Node}). - -push(#job{node = Node} = Job) when Node =/= node() -> - gen_server:cast(?MODULE, {push, Job}); -push(_) -> - ok. - -remove_node(Node) -> - gen_server:cast(?MODULE, {remove_node, Node}). - -remove_shard(Shard) -> - gen_server:cast(?MODULE, {remove_shard, Shard}). - -init([]) -> - process_flag(trap_exit, true), - Concurrency = config:get("mem3", "sync_concurrency", "10"), - gen_event:add_handler(mem3_events, mem3_sync_event, []), - initial_sync(), - {ok, #state{limit = list_to_integer(Concurrency)}}. - -handle_call({push, Job}, From, State) -> - handle_cast({push, Job#job{pid = From}}, State); - -handle_call(get_active, _From, State) -> - {reply, State#state.active, State}; - -handle_call(get_queue, _From, State) -> - {reply, to_list(State#state.waiting), State}; - -handle_call(get_backlog, _From, #state{active=A, waiting=WQ} = State) -> - CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]), - CW = lists:sum([C || #job{count=C} <- to_list(WQ), is_integer(C)]), - {reply, CA+CW, State}. - -handle_cast({push, DbName, Node}, State) -> - handle_cast({push, #job{name = DbName, node = Node}}, State); - -handle_cast({push, Job}, #state{count=Count, limit=Limit} = State) - when Count >= Limit -> - {noreply, add_to_queue(State, Job)}; - -handle_cast({push, Job}, State) -> - #state{active = L, count = C} = State, - #job{name = DbName, node = Node} = Job, - case is_running(DbName, Node, L) of - true -> - {noreply, add_to_queue(State, Job)}; - false -> - Pid = start_push_replication(Job), - {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}} - end; - -handle_cast({remove_node, Node}, #state{waiting = W0} = State) -> - {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, to_list(W0)), - Dict = remove_entries(State#state.dict, Dead), - [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active, - N =:= Node], - {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}; - -handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> - {Alive, Dead} = lists:partition(fun(#job{name=S}) -> - S =/= Shard end, to_list(W0)), - Dict = remove_entries(State#state.dict, Dead), - [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active, - S =:= Shard], - {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}. - -handle_info({'EXIT', Active, normal}, State) -> - handle_replication_exit(State, Active); - -handle_info({'EXIT', Active, die_now}, State) -> - % we forced this one ourselves, do not retry - handle_replication_exit(State, Active); - -handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) -> - % target doesn't exist, do not retry - handle_replication_exit(State, Active); - -handle_info({'EXIT', Active, Reason}, State) -> - NewState = case lists:keyfind(Active, #job.pid, State#state.active) of - #job{name=OldDbName, node=OldNode} = Job -> - couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]), - case Reason of {pending_changes, Count} -> - maybe_resubmit(State, Job#job{pid = nil, count = Count}); - _ -> - case mem3:db_is_current(Job#job.name) of - true -> - timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]); - false -> - % no need to retry (db deleted or recreated) - ok - end, - State - end; - false -> State end, - handle_replication_exit(NewState, Active); - -handle_info(Msg, State) -> - couch_log:notice("unexpected msg at replication manager ~p", [Msg]), - {noreply, State}. - -terminate(_Reason, State) -> - [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active], - ok. - -code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) -> - {ok, State#state{waiting = from_list(WaitingList)}}; - -code_change(_, State, _) -> - {ok, State}. - -maybe_resubmit(State, #job{name=DbName, node=Node} = Job) -> - case lists:member(DbName, local_dbs()) of - true -> - case find_next_node() of - Node -> - add_to_queue(State, Job); - _ -> - State % don't resubmit b/c we have a new replication target - end; - false -> - add_to_queue(State, Job) - end. - -handle_replication_exit(State, Pid) -> - #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, - Active1 = lists:keydelete(Pid, #job.pid, Active), - case is_empty(Waiting) of - true -> - {noreply, State#state{active=Active1, count=length(Active1)}}; - _ -> - Count = length(Active1), - NewState = if Count < Limit -> - case next_replication(Active1, Waiting, queue:new()) of - nil -> % all waiting replications are also active - State#state{active = Active1, count = Count}; - {#job{name=DbName, node=Node} = Job, StillWaiting} -> - NewPid = start_push_replication(Job), - State#state{ - active = [Job#job{pid = NewPid} | Active1], - count = Count+1, - dict = dict:erase({DbName,Node}, D), - waiting = StillWaiting - } - end; - true -> - State#state{active = Active1, count=Count} - end, - {noreply, NewState} - end. - -start_push_replication(#job{name=Name, node=Node, pid=From}) -> - if From =/= nil -> gen_server:reply(From, ok); true -> ok end, - spawn_link(fun() -> - case mem3_rep:go(Name, maybe_redirect(Node)) of - {ok, Pending} when Pending > 0 -> - exit({pending_changes, Pending}); - _ -> - ok - end - end). - -add_to_queue(State, #job{name=DbName, node=Node, pid=From} = Job) -> - #state{dict=D, waiting=WQ} = State, - case dict:is_key({DbName, Node}, D) of - true -> - if From =/= nil -> gen_server:reply(From, ok); true -> ok end, - State; - false -> - couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]), - State#state{ - dict = dict:store({DbName,Node}, ok, D), - waiting = in(Job, WQ) - } - end. - -sync_nodes_and_dbs() -> - Node = find_next_node(), - [push(Db, Node) || Db <- local_dbs()]. - -initial_sync() -> - [net_kernel:connect_node(Node) || Node <- mem3:nodes()], - mem3_sync_nodes:add(nodes()). - -initial_sync(Live) -> - sync_nodes_and_dbs(), - Acc = {node(), Live, []}, - {_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc), - submit_replication_tasks(node(), Live, Shards). - -initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) -> - case AccShards of - [#shard{dbname = AccDb} | _] when Db =/= AccDb -> - submit_replication_tasks(LocalNode, Live, AccShards), - {LocalNode, Live, [Shard]}; - _ -> - {LocalNode, Live, [Shard|AccShards]} - end. - -submit_replication_tasks(LocalNode, Live, Shards) -> - SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end, - {Local, Remote} = lists:partition(SplitFun, Shards), - lists:foreach(fun(#shard{name = ShardName}) -> - [sync_push(ShardName, N) || #shard{node=N, name=Name} <- Remote, - Name =:= ShardName, lists:member(N, Live)] - end, Local). - -sync_push(ShardName, N) -> - gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity). - - - -find_next_node() -> - LiveNodes = [node()|nodes()], - AllNodes0 = lists:sort(mem3:nodes()), - AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)], - AllNodes = AllNodes1 ++ [hd(AllNodes1)], - [_Self, Next| _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes), - Next. - -%% @doc Finds the next {DbName,Node} pair in the list of waiting replications -%% which does not correspond to an already running replication --spec next_replication([#job{}], queue:queue(_), queue:queue(_)) -> - {#job{}, queue:queue(_)} | nil. -next_replication(Active, Waiting, WaitingAndRunning) -> - case is_empty(Waiting) of - true -> - nil; - false -> - {{value, #job{name=S, node=N} = Job}, RemQ} = out(Waiting), - case is_running(S,N,Active) of - true -> - next_replication(Active, RemQ, in(Job, WaitingAndRunning)); - false -> - {Job, join(RemQ, WaitingAndRunning)} - end - end. - -is_running(DbName, Node, ActiveList) -> - [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node]. - -remove_entries(Dict, Entries) -> - lists:foldl(fun(#job{name=S, node=N}, D) -> - dict:erase({S, N}, D) - end, Dict, Entries). - -local_dbs() -> - [nodes_db(), shards_db(), users_db()]. - -nodes_db() -> - ?l2b(config:get("mem3", "nodes_db", "_nodes")). - -shards_db() -> - ?l2b(config:get("mem3", "shards_db", "_dbs")). - -users_db() -> - ?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")). - -maybe_redirect(Node) -> - case config:get("mem3.redirects", atom_to_list(Node)) of - undefined -> - Node; - Redirect -> - couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]), - list_to_existing_atom(Redirect) - end. |