summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-02-14 18:39:02 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-15 16:42:43 -0500
commit95b8080d5dfcc27225555e57c4e2568bd9301835 (patch)
treeb8ca907453048d746010286e645795cf403cd1a8
parent49acd7341d3435cf3e086e1d9c2a82b5b54c0bad (diff)
downloadcouchdb-95b8080d5dfcc27225555e57c4e2568bd9301835.tar.gz
Shard map updating now goes through a gen_server
-rw-r--r--src/mem3/src/mem3_reshard_dbdoc.erl261
-rw-r--r--src/mem3/src/mem3_reshard_sup.erl6
-rw-r--r--src/mem3/src/mem3_rpc.erl16
3 files changed, 157 insertions, 126 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
index d6914040f..8dd6f1b62 100644
--- a/src/mem3/src/mem3_reshard_dbdoc.erl
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -12,10 +12,19 @@
-module(mem3_reshard_dbdoc).
+-behaviour(gen_server).
-export([
update_shard_map/1,
- update_shard_map_rpc/3
+
+ start_link/0,
+
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3
]).
@@ -23,98 +32,134 @@
-include("mem3_reshard.hrl").
+-spec update_shard_map(#job{}) -> no_return | ok.
update_shard_map(#job{source = Source, target = Target} = Job) ->
- couch_log:info("~p : replicating dbs to local node", [?MODULE]),
- case mem3_util:replicate_dbs_from_all_nodes(600000) of
- ok ->
- ok;
- Error ->
- exit(Error)
+ Node = hd(mem3_util:live_nodes()),
+ JobStr = mem3_reshard_job:jobfmt(Job),
+ LogMsg1 = "~p : calling update_shard_map on node ~p. ~p",
+ couch_log:notice(LogMsg1, [?MODULE, Node, JobStr]),
+ ServerRef = {?MODULE, Node},
+ CallArg = {update_shard_map, Source, Target},
+ TimeoutMSec = shard_update_timeout_msec(),
+ try
+ case gen_server:call(ServerRef, CallArg, TimeoutMSec) of
+ {ok, _} -> ok;
+ {error, CallError} -> throw({error, CallError})
+ end
+ catch
+ _:Err ->
+ exit({shard_update_error, Err})
end,
+ LogMsg2 = "~p : update_shard_map on node returned. ~p",
+ couch_log:notice(LogMsg2, [?MODULE, Node]),
+ UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000),
+ case wait_source_removed(Source, 5, UntilSec) of
+ true -> ok;
+ false -> exit(shard_update_did_not_propagate)
+ end.
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ couch_log:notice("~p start init()", [?MODULE]),
+ {ok, nil}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call({update_shard_map, Source, Target}, _From, State) ->
+ Res = try
+ update_shard_map(Source, Target)
+ catch
+ throw:{error, Error} ->
+ {error, Error}
+ end,
+ {reply, Res, State};
+
+handle_call(Call, From, State) ->
+ couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
+ {noreply, State}.
+
+
+handle_cast(Cast, State) ->
+ couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
+ {noreply, State}.
+
+
+handle_info(Info, State) ->
+ couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+
+% Private
+
+update_shard_map(Source, Target) ->
+ ok = validate_coordinator(),
+ ok = replicate_from_all_nodes(shard_update_timeout_msec()),
#shard{name = SourceName} = Source,
DocId = mem3:dbname(SourceName),
- OldBody = case mem3_util:open_db_doc(DocId) of
- {ok, #doc{body = DocBody}} ->
- DocBody;
+ OldDoc = case mem3_util:open_db_doc(DocId) of
+ {ok, #doc{deleted = true}} ->
+ throw({error, missing_source});
+ {ok, #doc{} = Doc} ->
+ Doc;
{not_found, deleted} ->
- exit({error, missing_source});
+ throw({error, missing_source});
OpenErr ->
- exit({shard_doc_open_error, OpenErr})
+ throw({error, {shard_doc_open_error, OpenErr}})
end,
- Node = hd(mem3_util:live_nodes()),
- Body = update_shard_props(OldBody, Source, Target),
- JobStr = mem3_reshard_job:jobfmt(Job),
- LogArgs1 = [?MODULE, JobStr, Node, OldBody, Body],
- couch_log:notice("~p : ~s node:~p doc ~p -> ~p", LogArgs1),
- case mem3_rpc:update_shard_map(Node, DocId, OldBody, Body, 600000) of
- {ok, Res} ->
- LogArgs2 = [?MODULE, JobStr, Node, Res],
- couch_log:notice("~p : ~s node:~p updated ~p", LogArgs2),
- UntilSec = mem3_reshard:now_sec() + 600,
- case wait_source_removed(Source, 5, UntilSec) of
- true ->
- ok;
- false ->
- exit({shard_update_did_not_propagate, Source})
- end;
- UpdateErr ->
- ErrArgs = [?MODULE, JobStr, Node, UpdateErr],
- couch_log:error("~p : ~s node:~p error:~p", ErrArgs),
- exit({shard_doc_update_error, UpdateErr})
- end.
+ #doc{body = OldBody} = OldDoc,
+ NewBody = update_shard_props(OldBody, Source, Target),
+ {ok, _} = write_shard_doc(OldDoc, NewBody),
+ ok = replicate_to_all_nodes(shard_update_timeout_msec()),
+ {ok, NewBody}.
-update_shard_map_rpc(DocId, OldBody, {[_ | _]} = Body) ->
- RepRes = mem3_util:replicate_dbs_from_all_nodes(600000),
- CurrNode = hd(mem3_util:live_nodes()),
- % Make sure we are still the coordinator
- case {node() == CurrNode, RepRes} of
- {true, ok} ->
- case mem3_util:open_db_doc(DocId) of
- {ok, Doc} ->
- replicate_changes(write_shard_doc(Doc, OldBody, Body));
- Error ->
- Error
- end;
- {false, _} ->
- {error, {coordinator_changed, node(), CurrNode}};
- {_, {error, RepError}} ->
- {error, RepError}
- end.
+validate_coordinator() ->
+ case hd(mem3_util:live_nodes()) =:= node() of
+ true -> ok;
+ false -> throw({error, coordinator_changed})
+ end.
-% Private
-
-wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) ->
- case check_source_removed(Source) of
- true ->
- true;
- false ->
- case mem3_reshard:now_sec() < UntilSec of
- true ->
- LogMsg = "~p : Waiting for shard ~p removal confirmation",
- couch_log:notice(LogMsg, [?MODULE, Name]),
- timer:sleep(SleepSec * 1000),
- wait_source_removed(Source, SleepSec, UntilSec);
- false ->
- false
- end
+replicate_from_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
end.
-check_source_removed(#shard{name = Name}) ->
- DbName = mem3:dbname(Name),
- Live = mem3_util:live_nodes(),
- ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)],
- Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]),
- {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]),
- Shards = lists:usort(lists:flatten(Responses)),
- case [S || S = #shard{name = S, node = N} <- Shards, S =:= Name,
- N =:= node()] of
- [] -> true;
- [_ | _] -> false
+
+replicate_to_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
end.
+write_shard_doc(#doc{id = Id} = Doc, Body) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ UpdatedDoc = Doc#doc{body = Body},
+ couch_util:with_db(DbName, fun(Db) ->
+ try
+ {ok, _} = couch_db:update_doc(Db, UpdatedDoc, [])
+ catch
+ conflict ->
+ throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}})
+ end
+ end).
+
+
update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) ->
{ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}),
ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}},
@@ -154,7 +199,7 @@ remove_node_from_source(ByRange, Source) ->
true ->
ok;
false ->
- exit({source_shard_missing_node, NodeKey, SourceNodes})
+ throw({source_shard_missing_node, NodeKey, SourceNodes})
end,
SourceNodes1 = SourceNodes -- [NodeKey],
case SourceNodes1 of
@@ -175,7 +220,7 @@ add_node_to_target_foldl(#shard{} = Target, ByRange) ->
false ->
ok;
true ->
- exit({target_shard_already_has_node, NodeKey, Nodes})
+ throw({target_shard_already_has_node, NodeKey, Nodes})
end,
Nodes1 = lists:sort([NodeKey | Nodes]),
lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1});
@@ -194,40 +239,36 @@ range_key(#shard{range = [B, E]}) ->
list_to_binary([BHex, "-", EHex]).
-write_shard_doc(#doc{deleted = true, id = DocId}, _, _) ->
- {error, {shard_doc_deleted, DocId}};
+shard_update_timeout_msec() ->
+ config:get_integer("reshard", "shard_upate_timeout_msec", 300000).
+
-write_shard_doc(#doc{body = CurrOldBody} = Doc, OldBody, Body) ->
- % Check when writing that the document body looks exactly like
- % the version the modifications were based off to start with
- SameBody = couch_util:ejsort(CurrOldBody) == couch_util:ejsort(OldBody),
- case SameBody of
+wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) ->
+ case check_source_removed(Source) of
true ->
- DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
- UpdatedDoc = Doc#doc{body = Body},
- couch_util:with_db(DbName, fun(Db) ->
- try
- couch_db:update_doc(Db, UpdatedDoc, [])
- catch
- conflict ->
- {error, {conflict, CurrOldBody, UpdatedDoc}}
- end
- end);
+ true;
false ->
- {error, {conflict, CurrOldBody, OldBody}}
+ case mem3_reshard:now_sec() < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for shard ~p removal confirmation",
+ couch_log:notice(LogMsg, [?MODULE, Name]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_removed(Source, SleepSec, UntilSec);
+ false ->
+ false
+ end
end.
-replicate_changes({ok, Res}) ->
- couch_log:notice("~p : shard doc written, start replication", [?MODULE]),
- case mem3_util:replicate_dbs_to_all_nodes(300000) of
- ok ->
- couch_log:notice("~p : dbs replicated successfully", [?MODULE]),
- {ok, Res};
- {error, Error} ->
- couch_log:error("~p : dbs replication error: ~p", [?MODULE, Error]),
- {error, Error}
- end;
-
-replicate_changes(Error) ->
- Error.
+check_source_removed(#shard{name = Name}) ->
+ DbName = mem3:dbname(Name),
+ Live = mem3_util:live_nodes(),
+ ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)],
+ Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]),
+ {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]),
+ Shards = lists:usort(lists:flatten(Responses)),
+ case [S || S = #shard{name = S, node = N} <- Shards, S =:= Name,
+ N =:= node()] of
+ [] -> true;
+ [_ | _] -> false
+ end.
diff --git a/src/mem3/src/mem3_reshard_sup.erl b/src/mem3/src/mem3_reshard_sup.erl
index 9d3d78ef7..264ae4796 100644
--- a/src/mem3/src/mem3_reshard_sup.erl
+++ b/src/mem3/src/mem3_reshard_sup.erl
@@ -24,6 +24,12 @@ start_link() ->
init(_Args) ->
Children = [
+ {mem3_reshard_dbdoc,
+ {mem3_reshard_dbdoc, start_link, []},
+ permanent,
+ infinity,
+ worker,
+ [mem3_reshard_dbdoc]},
{mem3_reshard_job_sup,
{mem3_reshard_job_sup, start_link, []},
permanent,
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index a5cd6bb2a..272037e9d 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -28,7 +28,6 @@
save_purge_checkpoint/4,
purge_docs/4,
- update_shard_map/5,
replicate/4
]).
@@ -43,7 +42,6 @@
load_purge_infos_rpc/3,
save_purge_checkpoint_rpc/3,
- update_shard_map_rpc/3,
replicate_rpc/2
]).
@@ -100,11 +98,6 @@ purge_docs(Node, DbName, PurgeInfos, Options) ->
rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
-update_shard_map(Node, DocId, OldBody, Body, Timeout) ->
- Args = [DocId, OldBody, Body],
- rexi_call(Node, {mem3_rpc, update_shard_map_rpc, Args}, Timeout).
-
-
replicate(Source, Target, DbName, Timeout)
when is_atom(Source), is_atom(Target), is_binary(DbName) ->
Args = [DbName, Target],
@@ -237,15 +230,6 @@ save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
end.
-update_shard_map_rpc(DocId, OldBody, Body) ->
- rexi:reply(try
- {ok, mem3_reshard_dbdoc:update_shard_map_rpc(DocId, OldBody, Body)}
- catch
- Tag:Error ->
- {Tag, Error}
- end).
-
-
replicate_rpc(DbName, Target) ->
rexi:reply(try
Opts = [{batch_size, 1000}, {batch_count, all}],