diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2018-12-20 12:11:10 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2018-12-20 15:41:55 -0500 |
commit | 88dd1255595b513ff778e5efa4b2399aa3ccb570 (patch) | |
tree | 7ec874423fa1af68772eb4ee630083bd9ab2cfc3 | |
parent | f60f7a1c66a9b238ae66798b82058a4d9dc82731 (diff) | |
download | couchdb-88dd1255595b513ff778e5efa4b2399aa3ccb570.tar.gz |
Move fabric streams to a fabric_streams module
Streams functionality is fairly isolated from the rest of the utils module so
move it to its own. This is mostly in preparation to add a streams workers
cleaner process.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_fabric.erl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric_streams.erl | 119 | ||||
-rw-r--r-- | src/fabric/src/fabric_util.erl | 88 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_all_docs.erl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_changes.erl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_map.erl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric_view_reduce.erl | 4 |
7 files changed, 129 insertions, 98 deletions
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl index 6998b2803..1650105b5 100644 --- a/src/couch_replicator/src/couch_replicator_fabric.erl +++ b/src/couch_replicator/src/couch_replicator_fabric.erl @@ -27,12 +27,12 @@ docs(DbName, Options, QueryArgs, Callback, Acc) -> Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_util:stream_start(Workers0, #shard.ref) of + case fabric_streams:start(Workers0, #shard.ref) of {ok, Workers} -> try docs_int(DbName, Workers, QueryArgs, Callback, Acc) after - fabric_util:cleanup(Workers) + fabric_streams:cleanup(Workers) end; {timeout, NewState} -> DefunctWorkers = fabric_util:remove_done_workers( diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl new file mode 100644 index 000000000..32217c3cf --- /dev/null +++ b/src/fabric/src/fabric_streams.erl @@ -0,0 +1,119 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_streams). + +-export([ + start/2, + start/4, + cleanup/1 +]). + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +start(Workers, Keypos) -> + start(Workers, Keypos, undefined, undefined). + +start(Workers0, Keypos, StartFun, Replacements) -> + Fun = fun handle_stream_start/3, + Acc = #stream_acc{ + workers = fabric_dict:init(Workers0, waiting), + start_fun = StartFun, + replacements = Replacements + }, + Timeout = fabric_util:request_timeout(), + case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of + {ok, #stream_acc{workers=Workers}} -> + true = fabric_view:is_progress_possible(Workers), + AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) -> + rexi:stream_start(From), + [Worker | WorkerAcc] + end, [], Workers), + {ok, AckedWorkers}; + Else -> + Else + end. + + +cleanup(Workers) -> + fabric_util:cleanup(Workers). + + +handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) -> + case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of + {ok, Workers} -> + {ok, St#stream_acc{workers=Workers}}; + error -> + Reason = {nodedown, <<"progress not possible">>}, + {error, Reason} + end; + +handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> + Workers = fabric_dict:erase(Worker, St#stream_acc.workers), + Replacements = St#stream_acc.replacements, + case {fabric_view:is_progress_possible(Workers), Reason} of + {true, _} -> + {ok, St#stream_acc{workers=Workers}}; + {false, {maintenance_mode, _Node}} when Replacements /= undefined -> + % Check if we have replacements for this range + % and start the new workers if so. + case lists:keytake(Worker#shard.range, 1, Replacements) of + {value, {_Range, WorkerReplacements}, NewReplacements} -> + FinalWorkers = lists:foldl(fun(Repl, NewWorkers) -> + NewWorker = (St#stream_acc.start_fun)(Repl), + fabric_dict:store(NewWorker, waiting, NewWorkers) + end, Workers, WorkerReplacements), + % Assert that our replaced worker provides us + % the oppurtunity to make progress. + true = fabric_view:is_progress_possible(FinalWorkers), + NewRefs = fabric_dict:fetch_keys(FinalWorkers), + {new_refs, NewRefs, St#stream_acc{ + workers=FinalWorkers, + replacements=NewReplacements + }}; + false -> + % If we progress isn't possible and we don't have any + % replacements then we're dead in the water. + Error = {nodedown, <<"progress not possible">>}, + {error, Error} + end; + {false, _} -> + {error, fabric_util:error_info(Reason)} + end; + +handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> + case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of + undefined -> + % This worker lost the race with other partition copies, terminate + rexi:stream_cancel(From), + {ok, St}; + waiting -> + % Don't ack the worker yet so they don't start sending us + % rows until we're ready + Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers), + Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0), + case fabric_dict:any(waiting, Workers1) of + true -> + {ok, St#stream_acc{workers=Workers1}}; + false -> + {stop, St#stream_acc{workers=Workers1}} + end + end; + +handle_stream_start({ok, ddoc_updated}, _, St) -> + cleanup(St#stream_acc.workers), + {stop, ddoc_updated}; + +handle_stream_start(Else, _, _) -> + exit({invalid_stream_start, Else}). diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index e622c6aa0..cc1f1b622 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -16,7 +16,6 @@ update_counter/3, remove_ancestors/2, create_monitors/1, kv/2, remove_down_workers/2, doc_id_and_rev/1]). -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]). --export([stream_start/2, stream_start/4]). -export([log_timeout/2, remove_done_workers/2]). -export([is_users_db/1, is_replicator_db/1, fake_db/2]). -export([upgrade_mrargs/1]). @@ -51,93 +50,6 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) -> cleanup(Workers) -> [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. -stream_start(Workers, Keypos) -> - stream_start(Workers, Keypos, undefined, undefined). - -stream_start(Workers0, Keypos, StartFun, Replacements) -> - Fun = fun handle_stream_start/3, - Acc = #stream_acc{ - workers = fabric_dict:init(Workers0, waiting), - start_fun = StartFun, - replacements = Replacements - }, - Timeout = request_timeout(), - case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of - {ok, #stream_acc{workers=Workers}} -> - true = fabric_view:is_progress_possible(Workers), - AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) -> - rexi:stream_start(From), - [Worker | WorkerAcc] - end, [], Workers), - {ok, AckedWorkers}; - Else -> - Else - end. - -handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) -> - case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of - {ok, Workers} -> - {ok, St#stream_acc{workers=Workers}}; - error -> - Reason = {nodedown, <<"progress not possible">>}, - {error, Reason} - end; -handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> - Workers = fabric_dict:erase(Worker, St#stream_acc.workers), - Replacements = St#stream_acc.replacements, - case {fabric_view:is_progress_possible(Workers), Reason} of - {true, _} -> - {ok, St#stream_acc{workers=Workers}}; - {false, {maintenance_mode, _Node}} when Replacements /= undefined -> - % Check if we have replacements for this range - % and start the new workers if so. - case lists:keytake(Worker#shard.range, 1, Replacements) of - {value, {_Range, WorkerReplacements}, NewReplacements} -> - FinalWorkers = lists:foldl(fun(Repl, NewWorkers) -> - NewWorker = (St#stream_acc.start_fun)(Repl), - fabric_dict:store(NewWorker, waiting, NewWorkers) - end, Workers, WorkerReplacements), - % Assert that our replaced worker provides us - % the oppurtunity to make progress. - true = fabric_view:is_progress_possible(FinalWorkers), - NewRefs = fabric_dict:fetch_keys(FinalWorkers), - {new_refs, NewRefs, St#stream_acc{ - workers=FinalWorkers, - replacements=NewReplacements - }}; - false -> - % If we progress isn't possible and we don't have any - % replacements then we're dead in the water. - Error = {nodedown, <<"progress not possible">>}, - {error, Error} - end; - {false, _} -> - {error, fabric_util:error_info(Reason)} - end; -handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> - case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of - undefined -> - % This worker lost the race with other partition copies, terminate - rexi:stream_cancel(From), - {ok, St}; - waiting -> - % Don't ack the worker yet so they don't start sending us - % rows until we're ready - Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers), - Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0), - case fabric_dict:any(waiting, Workers1) of - true -> - {ok, St#stream_acc{workers=Workers1}}; - false -> - {stop, St#stream_acc{workers=Workers1}} - end - end; -handle_stream_start({ok, ddoc_updated}, _, St) -> - cleanup(St#stream_acc.workers), - {stop, ddoc_updated}; -handle_stream_start(Else, _, _) -> - exit({invalid_stream_start, Else}). - recv(Workers, Keypos, Fun, Acc0) -> rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity). diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index 30c8e8d51..a404125fa 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -26,12 +26,12 @@ go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> Shards, fabric_rpc, all_docs, [Options, QueryArgs]), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_util:stream_start(Workers0, #shard.ref) of + case fabric_streams:start(Workers0, #shard.ref) of {ok, Workers} -> try go(DbName, Options, Workers, QueryArgs, Callback, Acc) after - fabric_util:cleanup(Workers) + fabric_streams:cleanup(Workers) end; {timeout, NewState} -> DefunctWorkers = fabric_util:remove_done_workers( diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index 7288f1aa5..f96bb058d 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -166,7 +166,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> end, RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of + case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of {ok, Workers} -> try LiveSeqs = lists:map(fun(W) -> @@ -178,7 +178,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> send_changes(DbName, Workers, LiveSeqs, ChangesArgs, Callback, AccIn, Timeout) after - fabric_util:cleanup(Workers) + fabric_streams:cleanup(Workers) end; {timeout, NewState} -> DefunctWorkers = fabric_util:remove_done_workers( diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index b6a3d6f83..ee51bfe74 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -36,14 +36,14 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of + case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of {ok, ddoc_updated} -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try go(DbName, Workers, VInfo, Args, Callback, Acc) after - fabric_util:cleanup(Workers) + fabric_streams:cleanup(Workers) end; {timeout, NewState} -> DefunctWorkers = fabric_util:remove_done_workers( diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index a74be1073..b2b8a05f0 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -35,14 +35,14 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs), RexiMon = fabric_util:create_monitors(Workers0), try - case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of + case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of {ok, ddoc_updated} -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try go2(DbName, Workers, VInfo, Args, Callback, Acc) after - fabric_util:cleanup(Workers) + fabric_streams:cleanup(Workers) end; {timeout, NewState} -> DefunctWorkers = fabric_util:remove_done_workers( |