diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-14 18:39:02 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-15 16:42:43 -0500 |
commit | 95b8080d5dfcc27225555e57c4e2568bd9301835 (patch) | |
tree | b8ca907453048d746010286e645795cf403cd1a8 | |
parent | 49acd7341d3435cf3e086e1d9c2a82b5b54c0bad (diff) | |
download | couchdb-95b8080d5dfcc27225555e57c4e2568bd9301835.tar.gz |
Shard map updating now goes through a gen_server
-rw-r--r-- | src/mem3/src/mem3_reshard_dbdoc.erl | 261 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_sup.erl | 6 | ||||
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 16 |
3 files changed, 157 insertions, 126 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl index d6914040f..8dd6f1b62 100644 --- a/src/mem3/src/mem3_reshard_dbdoc.erl +++ b/src/mem3/src/mem3_reshard_dbdoc.erl @@ -12,10 +12,19 @@ -module(mem3_reshard_dbdoc). +-behaviour(gen_server). -export([ update_shard_map/1, - update_shard_map_rpc/3 + + start_link/0, + + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3 ]). @@ -23,98 +32,134 @@ -include("mem3_reshard.hrl"). +-spec update_shard_map(#job{}) -> no_return | ok. update_shard_map(#job{source = Source, target = Target} = Job) -> - couch_log:info("~p : replicating dbs to local node", [?MODULE]), - case mem3_util:replicate_dbs_from_all_nodes(600000) of - ok -> - ok; - Error -> - exit(Error) + Node = hd(mem3_util:live_nodes()), + JobStr = mem3_reshard_job:jobfmt(Job), + LogMsg1 = "~p : calling update_shard_map on node ~p. ~p", + couch_log:notice(LogMsg1, [?MODULE, Node, JobStr]), + ServerRef = {?MODULE, Node}, + CallArg = {update_shard_map, Source, Target}, + TimeoutMSec = shard_update_timeout_msec(), + try + case gen_server:call(ServerRef, CallArg, TimeoutMSec) of + {ok, _} -> ok; + {error, CallError} -> throw({error, CallError}) + end + catch + _:Err -> + exit({shard_update_error, Err}) end, + LogMsg2 = "~p : update_shard_map on node returned. ~p", + couch_log:notice(LogMsg2, [?MODULE, Node]), + UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000), + case wait_source_removed(Source, 5, UntilSec) of + true -> ok; + false -> exit(shard_update_did_not_propagate) + end. + + +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + couch_log:notice("~p start init()", [?MODULE]), + {ok, nil}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call({update_shard_map, Source, Target}, _From, State) -> + Res = try + update_shard_map(Source, Target) + catch + throw:{error, Error} -> + {error, Error} + end, + {reply, Res, State}; + +handle_call(Call, From, State) -> + couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]), + {noreply, State}. + + +handle_cast(Cast, State) -> + couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]), + {noreply, State}. + + +handle_info(Info, State) -> + couch_log:error("~p unexpected info ~p", [?MODULE, Info]), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + +% Private + +update_shard_map(Source, Target) -> + ok = validate_coordinator(), + ok = replicate_from_all_nodes(shard_update_timeout_msec()), #shard{name = SourceName} = Source, DocId = mem3:dbname(SourceName), - OldBody = case mem3_util:open_db_doc(DocId) of - {ok, #doc{body = DocBody}} -> - DocBody; + OldDoc = case mem3_util:open_db_doc(DocId) of + {ok, #doc{deleted = true}} -> + throw({error, missing_source}); + {ok, #doc{} = Doc} -> + Doc; {not_found, deleted} -> - exit({error, missing_source}); + throw({error, missing_source}); OpenErr -> - exit({shard_doc_open_error, OpenErr}) + throw({error, {shard_doc_open_error, OpenErr}}) end, - Node = hd(mem3_util:live_nodes()), - Body = update_shard_props(OldBody, Source, Target), - JobStr = mem3_reshard_job:jobfmt(Job), - LogArgs1 = [?MODULE, JobStr, Node, OldBody, Body], - couch_log:notice("~p : ~s node:~p doc ~p -> ~p", LogArgs1), - case mem3_rpc:update_shard_map(Node, DocId, OldBody, Body, 600000) of - {ok, Res} -> - LogArgs2 = [?MODULE, JobStr, Node, Res], - couch_log:notice("~p : ~s node:~p updated ~p", LogArgs2), - UntilSec = mem3_reshard:now_sec() + 600, - case wait_source_removed(Source, 5, UntilSec) of - true -> - ok; - false -> - exit({shard_update_did_not_propagate, Source}) - end; - UpdateErr -> - ErrArgs = [?MODULE, JobStr, Node, UpdateErr], - couch_log:error("~p : ~s node:~p error:~p", ErrArgs), - exit({shard_doc_update_error, UpdateErr}) - end. + #doc{body = OldBody} = OldDoc, + NewBody = update_shard_props(OldBody, Source, Target), + {ok, _} = write_shard_doc(OldDoc, NewBody), + ok = replicate_to_all_nodes(shard_update_timeout_msec()), + {ok, NewBody}. -update_shard_map_rpc(DocId, OldBody, {[_ | _]} = Body) -> - RepRes = mem3_util:replicate_dbs_from_all_nodes(600000), - CurrNode = hd(mem3_util:live_nodes()), - % Make sure we are still the coordinator - case {node() == CurrNode, RepRes} of - {true, ok} -> - case mem3_util:open_db_doc(DocId) of - {ok, Doc} -> - replicate_changes(write_shard_doc(Doc, OldBody, Body)); - Error -> - Error - end; - {false, _} -> - {error, {coordinator_changed, node(), CurrNode}}; - {_, {error, RepError}} -> - {error, RepError} - end. +validate_coordinator() -> + case hd(mem3_util:live_nodes()) =:= node() of + true -> ok; + false -> throw({error, coordinator_changed}) + end. -% Private - -wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) -> - case check_source_removed(Source) of - true -> - true; - false -> - case mem3_reshard:now_sec() < UntilSec of - true -> - LogMsg = "~p : Waiting for shard ~p removal confirmation", - couch_log:notice(LogMsg, [?MODULE, Name]), - timer:sleep(SleepSec * 1000), - wait_source_removed(Source, SleepSec, UntilSec); - false -> - false - end +replicate_from_all_nodes(TimeoutMSec) -> + case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of + ok -> ok; + Error -> throw({error, Error}) end. -check_source_removed(#shard{name = Name}) -> - DbName = mem3:dbname(Name), - Live = mem3_util:live_nodes(), - ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)], - Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]), - {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]), - Shards = lists:usort(lists:flatten(Responses)), - case [S || S = #shard{name = S, node = N} <- Shards, S =:= Name, - N =:= node()] of - [] -> true; - [_ | _] -> false + +replicate_to_all_nodes(TimeoutMSec) -> + case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of + ok -> ok; + Error -> throw({error, Error}) end. +write_shard_doc(#doc{id = Id} = Doc, Body) -> + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + UpdatedDoc = Doc#doc{body = Body}, + couch_util:with_db(DbName, fun(Db) -> + try + {ok, _} = couch_db:update_doc(Db, UpdatedDoc, []) + catch + conflict -> + throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}}) + end + end). + + update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) -> {ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}), ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}}, @@ -154,7 +199,7 @@ remove_node_from_source(ByRange, Source) -> true -> ok; false -> - exit({source_shard_missing_node, NodeKey, SourceNodes}) + throw({source_shard_missing_node, NodeKey, SourceNodes}) end, SourceNodes1 = SourceNodes -- [NodeKey], case SourceNodes1 of @@ -175,7 +220,7 @@ add_node_to_target_foldl(#shard{} = Target, ByRange) -> false -> ok; true -> - exit({target_shard_already_has_node, NodeKey, Nodes}) + throw({target_shard_already_has_node, NodeKey, Nodes}) end, Nodes1 = lists:sort([NodeKey | Nodes]), lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1}); @@ -194,40 +239,36 @@ range_key(#shard{range = [B, E]}) -> list_to_binary([BHex, "-", EHex]). -write_shard_doc(#doc{deleted = true, id = DocId}, _, _) -> - {error, {shard_doc_deleted, DocId}}; +shard_update_timeout_msec() -> + config:get_integer("reshard", "shard_upate_timeout_msec", 300000). + -write_shard_doc(#doc{body = CurrOldBody} = Doc, OldBody, Body) -> - % Check when writing that the document body looks exactly like - % the version the modifications were based off to start with - SameBody = couch_util:ejsort(CurrOldBody) == couch_util:ejsort(OldBody), - case SameBody of +wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) -> + case check_source_removed(Source) of true -> - DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), - UpdatedDoc = Doc#doc{body = Body}, - couch_util:with_db(DbName, fun(Db) -> - try - couch_db:update_doc(Db, UpdatedDoc, []) - catch - conflict -> - {error, {conflict, CurrOldBody, UpdatedDoc}} - end - end); + true; false -> - {error, {conflict, CurrOldBody, OldBody}} + case mem3_reshard:now_sec() < UntilSec of + true -> + LogMsg = "~p : Waiting for shard ~p removal confirmation", + couch_log:notice(LogMsg, [?MODULE, Name]), + timer:sleep(SleepSec * 1000), + wait_source_removed(Source, SleepSec, UntilSec); + false -> + false + end end. -replicate_changes({ok, Res}) -> - couch_log:notice("~p : shard doc written, start replication", [?MODULE]), - case mem3_util:replicate_dbs_to_all_nodes(300000) of - ok -> - couch_log:notice("~p : dbs replicated successfully", [?MODULE]), - {ok, Res}; - {error, Error} -> - couch_log:error("~p : dbs replication error: ~p", [?MODULE, Error]), - {error, Error} - end; - -replicate_changes(Error) -> - Error. +check_source_removed(#shard{name = Name}) -> + DbName = mem3:dbname(Name), + Live = mem3_util:live_nodes(), + ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)], + Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]), + {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]), + Shards = lists:usort(lists:flatten(Responses)), + case [S || S = #shard{name = S, node = N} <- Shards, S =:= Name, + N =:= node()] of + [] -> true; + [_ | _] -> false + end. diff --git a/src/mem3/src/mem3_reshard_sup.erl b/src/mem3/src/mem3_reshard_sup.erl index 9d3d78ef7..264ae4796 100644 --- a/src/mem3/src/mem3_reshard_sup.erl +++ b/src/mem3/src/mem3_reshard_sup.erl @@ -24,6 +24,12 @@ start_link() -> init(_Args) -> Children = [ + {mem3_reshard_dbdoc, + {mem3_reshard_dbdoc, start_link, []}, + permanent, + infinity, + worker, + [mem3_reshard_dbdoc]}, {mem3_reshard_job_sup, {mem3_reshard_job_sup, start_link, []}, permanent, diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index a5cd6bb2a..272037e9d 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -28,7 +28,6 @@ save_purge_checkpoint/4, purge_docs/4, - update_shard_map/5, replicate/4 ]). @@ -43,7 +42,6 @@ load_purge_infos_rpc/3, save_purge_checkpoint_rpc/3, - update_shard_map_rpc/3, replicate_rpc/2 ]). @@ -100,11 +98,6 @@ purge_docs(Node, DbName, PurgeInfos, Options) -> rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}). -update_shard_map(Node, DocId, OldBody, Body, Timeout) -> - Args = [DocId, OldBody, Body], - rexi_call(Node, {mem3_rpc, update_shard_map_rpc, Args}, Timeout). - - replicate(Source, Target, DbName, Timeout) when is_atom(Source), is_atom(Target), is_binary(DbName) -> Args = [DbName, Target], @@ -237,15 +230,6 @@ save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) -> end. -update_shard_map_rpc(DocId, OldBody, Body) -> - rexi:reply(try - {ok, mem3_reshard_dbdoc:update_shard_map_rpc(DocId, OldBody, Body)} - catch - Tag:Error -> - {Tag, Error} - end). - - replicate_rpc(DbName, Target) -> rexi:reply(try Opts = [{batch_size, 1000}, {batch_count, all}], |