diff options
Diffstat (limited to 'src/mem3/src/mem3_reshard_dbdoc.erl')
-rw-r--r-- | src/mem3/src/mem3_reshard_dbdoc.erl | 255 |
1 files changed, 0 insertions, 255 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl deleted file mode 100644 index 7fb69598e..000000000 --- a/src/mem3/src/mem3_reshard_dbdoc.erl +++ /dev/null @@ -1,255 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_reshard_dbdoc). - --behaviour(gen_server). - --export([ - update_shard_map/1, - - start_link/0, - - init/1, - terminate/2, - handle_call/3, - handle_cast/2, - handle_info/2, - code_change/3 -]). - --include_lib("couch/include/couch_db.hrl"). --include("mem3_reshard.hrl"). - --spec update_shard_map(#job{}) -> no_return | ok. -update_shard_map(#job{source = Source, target = Target} = Job) -> - Node = hd(mem3_util:live_nodes()), - JobStr = mem3_reshard_job:jobfmt(Job), - LogMsg1 = "~p : ~p calling update_shard_map node:~p", - couch_log:notice(LogMsg1, [?MODULE, JobStr, Node]), - ServerRef = {?MODULE, Node}, - CallArg = {update_shard_map, Source, Target}, - TimeoutMSec = shard_update_timeout_msec(), - try - case gen_server:call(ServerRef, CallArg, TimeoutMSec) of - {ok, _} -> ok; - {error, CallError} -> throw({error, CallError}) - end - catch - _:Err -> - exit(Err) - end, - LogMsg2 = "~p : ~p update_shard_map on node:~p returned", - couch_log:notice(LogMsg2, [?MODULE, JobStr, Node]), - UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000), - case wait_source_removed(Source, 5, UntilSec) of - true -> ok; - false -> exit(shard_update_did_not_propagate) - end. - --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -init(_) -> - couch_log:notice("~p start init()", [?MODULE]), - {ok, nil}. - -terminate(_Reason, _State) -> - ok. - -handle_call({update_shard_map, Source, Target}, _From, State) -> - Res = - try - update_shard_map(Source, Target) - catch - throw:{error, Error} -> - {error, Error} - end, - {reply, Res, State}; -handle_call(Call, From, State) -> - couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]), - {noreply, State}. - -handle_cast(Cast, State) -> - couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]), - {noreply, State}. - -handle_info(Info, State) -> - couch_log:error("~p unexpected info ~p", [?MODULE, Info]), - {noreply, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -% Private - -update_shard_map(Source, Target) -> - ok = validate_coordinator(), - ok = replicate_from_all_nodes(shard_update_timeout_msec()), - DocId = mem3:dbname(Source#shard.name), - OldDoc = - case mem3_util:open_db_doc(DocId) of - {ok, #doc{deleted = true}} -> - throw({error, missing_source}); - {ok, #doc{} = Doc} -> - Doc; - {not_found, deleted} -> - throw({error, missing_source}); - OpenErr -> - throw({error, {shard_doc_open_error, OpenErr}}) - end, - #doc{body = OldBody} = OldDoc, - NewBody = update_shard_props(OldBody, Source, Target), - {ok, _} = write_shard_doc(OldDoc, NewBody), - ok = replicate_to_all_nodes(shard_update_timeout_msec()), - {ok, NewBody}. - -validate_coordinator() -> - case hd(mem3_util:live_nodes()) =:= node() of - true -> ok; - false -> throw({error, coordinator_changed}) - end. - -replicate_from_all_nodes(TimeoutMSec) -> - case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of - ok -> ok; - Error -> throw({error, Error}) - end. - -replicate_to_all_nodes(TimeoutMSec) -> - case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of - ok -> ok; - Error -> throw({error, Error}) - end. - -write_shard_doc(#doc{id = Id} = Doc, Body) -> - UpdatedDoc = Doc#doc{body = Body}, - couch_util:with_db(mem3_sync:shards_db(), fun(Db) -> - try - {ok, _} = couch_db:update_doc(Db, UpdatedDoc, []) - catch - conflict -> - throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}}) - end - end). - -update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) -> - {ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}), - ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}}, - Props1 = lists:keyreplace(<<"by_node">>, 1, Props0, ByNodeKV), - - {ByRange0} = couch_util:get_value(<<"by_range">>, Props1, {[]}), - ByRangeKV = {<<"by_range">>, {update_by_range(ByRange0, Source, Targets)}}, - Props2 = lists:keyreplace(<<"by_range">>, 1, Props1, ByRangeKV), - - Changelog = couch_util:get_value(<<"changelog">>, Props2, []), - {Node, Range} = {node_key(Source), range_key(Source)}, - TRanges = [range_key(T) || T <- Targets], - ChangelogEntry = [[<<"split">>, Range, TRanges, Node]], - ChangelogKV = {<<"changelog">>, Changelog ++ ChangelogEntry}, - Props3 = lists:keyreplace(<<"changelog">>, 1, Props2, ChangelogKV), - - {Props3}. - -update_by_node(ByNode, #shard{} = Source, [#shard{} | _] = Targets) -> - {NodeKey, SKey} = {node_key(Source), range_key(Source)}, - {_, Ranges} = lists:keyfind(NodeKey, 1, ByNode), - Ranges1 = Ranges -- [SKey], - Ranges2 = Ranges1 ++ [range_key(T) || T <- Targets], - lists:keyreplace(NodeKey, 1, ByNode, {NodeKey, lists:sort(Ranges2)}). - -update_by_range(ByRange, Source, Targets) -> - ByRange1 = remove_node_from_source(ByRange, Source), - lists:foldl(fun add_node_to_target_foldl/2, ByRange1, Targets). - -remove_node_from_source(ByRange, Source) -> - {NodeKey, SKey} = {node_key(Source), range_key(Source)}, - {_, SourceNodes} = lists:keyfind(SKey, 1, ByRange), - % Double check that source had node to begin with - case lists:member(NodeKey, SourceNodes) of - true -> - ok; - false -> - throw({source_shard_missing_node, NodeKey, SourceNodes}) - end, - SourceNodes1 = SourceNodes -- [NodeKey], - case SourceNodes1 of - [] -> - % If last node deleted, remove entry - lists:keydelete(SKey, 1, ByRange); - _ -> - lists:keyreplace(SKey, 1, ByRange, {SKey, SourceNodes1}) - end. - -add_node_to_target_foldl(#shard{} = Target, ByRange) -> - {NodeKey, TKey} = {node_key(Target), range_key(Target)}, - case lists:keyfind(TKey, 1, ByRange) of - {_, Nodes} -> - % Double check that target does not have node already - case lists:member(NodeKey, Nodes) of - false -> - ok; - true -> - throw({target_shard_already_has_node, NodeKey, Nodes}) - end, - Nodes1 = lists:sort([NodeKey | Nodes]), - lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1}); - false -> - % fabric_db_create:make_document/3 says they should be sorted - lists:sort([{TKey, [NodeKey]} | ByRange]) - end. - -node_key(#shard{node = Node}) -> - couch_util:to_binary(Node). - -range_key(#shard{range = [B, E]}) -> - BHex = couch_util:to_hex(<<B:32/integer>>), - EHex = couch_util:to_hex(<<E:32/integer>>), - list_to_binary([BHex, "-", EHex]). - -shard_update_timeout_msec() -> - config:get_integer("reshard", "shard_upate_timeout_msec", 300000). - -wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) -> - case check_source_removed(Source) of - true -> - true; - false -> - case mem3_reshard:now_sec() < UntilSec of - true -> - LogMsg = "~p : Waiting for shard ~p removal confirmation", - couch_log:notice(LogMsg, [?MODULE, Name]), - timer:sleep(SleepSec * 1000), - wait_source_removed(Source, SleepSec, UntilSec); - false -> - false - end - end. - -check_source_removed(#shard{name = Name}) -> - DbName = mem3:dbname(Name), - Live = mem3_util:live_nodes(), - ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)], - Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]), - {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]), - Shards = lists:usort(lists:flatten(Responses)), - SourcePresent = [ - S - || S = #shard{name = S, node = N} <- Shards, - S =:= Name, - N =:= node() - ], - case SourcePresent of - [] -> true; - [_ | _] -> false - end. |