summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2018-12-20 12:11:10 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2018-12-20 15:41:55 -0500
commit88dd1255595b513ff778e5efa4b2399aa3ccb570 (patch)
tree7ec874423fa1af68772eb4ee630083bd9ab2cfc3
parentf60f7a1c66a9b238ae66798b82058a4d9dc82731 (diff)
downloadcouchdb-88dd1255595b513ff778e5efa4b2399aa3ccb570.tar.gz
Move fabric streams to a fabric_streams module
Streams functionality is fairly isolated from the rest of the utils module so move it to its own. This is mostly in preparation to add a streams workers cleaner process.
-rw-r--r--src/couch_replicator/src/couch_replicator_fabric.erl4
-rw-r--r--src/fabric/src/fabric_streams.erl119
-rw-r--r--src/fabric/src/fabric_util.erl88
-rw-r--r--src/fabric/src/fabric_view_all_docs.erl4
-rw-r--r--src/fabric/src/fabric_view_changes.erl4
-rw-r--r--src/fabric/src/fabric_view_map.erl4
-rw-r--r--src/fabric/src/fabric_view_reduce.erl4
7 files changed, 129 insertions, 98 deletions
diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl
index 6998b2803..1650105b5 100644
--- a/src/couch_replicator/src/couch_replicator_fabric.erl
+++ b/src/couch_replicator/src/couch_replicator_fabric.erl
@@ -27,12 +27,12 @@ docs(DbName, Options, QueryArgs, Callback, Acc) ->
Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_util:stream_start(Workers0, #shard.ref) of
+ case fabric_streams:start(Workers0, #shard.ref) of
{ok, Workers} ->
try
docs_int(DbName, Workers, QueryArgs, Callback, Acc)
after
- fabric_util:cleanup(Workers)
+ fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
new file mode 100644
index 000000000..32217c3cf
--- /dev/null
+++ b/src/fabric/src/fabric_streams.erl
@@ -0,0 +1,119 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_streams).
+
+-export([
+ start/2,
+ start/4,
+ cleanup/1
+]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+start(Workers, Keypos) ->
+ start(Workers, Keypos, undefined, undefined).
+
+start(Workers0, Keypos, StartFun, Replacements) ->
+ Fun = fun handle_stream_start/3,
+ Acc = #stream_acc{
+ workers = fabric_dict:init(Workers0, waiting),
+ start_fun = StartFun,
+ replacements = Replacements
+ },
+ Timeout = fabric_util:request_timeout(),
+ case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+ {ok, #stream_acc{workers=Workers}} ->
+ true = fabric_view:is_progress_possible(Workers),
+ AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+ rexi:stream_start(From),
+ [Worker | WorkerAcc]
+ end, [], Workers),
+ {ok, AckedWorkers};
+ Else ->
+ Else
+ end.
+
+
+cleanup(Workers) ->
+ fabric_util:cleanup(Workers).
+
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+ case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+ {ok, Workers} ->
+ {ok, St#stream_acc{workers=Workers}};
+ error ->
+ Reason = {nodedown, <<"progress not possible">>},
+ {error, Reason}
+ end;
+
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+ Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+ Replacements = St#stream_acc.replacements,
+ case {fabric_view:is_progress_possible(Workers), Reason} of
+ {true, _} ->
+ {ok, St#stream_acc{workers=Workers}};
+ {false, {maintenance_mode, _Node}} when Replacements /= undefined ->
+ % Check if we have replacements for this range
+ % and start the new workers if so.
+ case lists:keytake(Worker#shard.range, 1, Replacements) of
+ {value, {_Range, WorkerReplacements}, NewReplacements} ->
+ FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+ NewWorker = (St#stream_acc.start_fun)(Repl),
+ fabric_dict:store(NewWorker, waiting, NewWorkers)
+ end, Workers, WorkerReplacements),
+ % Assert that our replaced worker provides us
+ % the oppurtunity to make progress.
+ true = fabric_view:is_progress_possible(FinalWorkers),
+ NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+ {new_refs, NewRefs, St#stream_acc{
+ workers=FinalWorkers,
+ replacements=NewReplacements
+ }};
+ false ->
+ % If we progress isn't possible and we don't have any
+ % replacements then we're dead in the water.
+ Error = {nodedown, <<"progress not possible">>},
+ {error, Error}
+ end;
+ {false, _} ->
+ {error, fabric_util:error_info(Reason)}
+ end;
+
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+ case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
+ undefined ->
+ % This worker lost the race with other partition copies, terminate
+ rexi:stream_cancel(From),
+ {ok, St};
+ waiting ->
+ % Don't ack the worker yet so they don't start sending us
+ % rows until we're ready
+ Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+ Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+ case fabric_dict:any(waiting, Workers1) of
+ true ->
+ {ok, St#stream_acc{workers=Workers1}};
+ false ->
+ {stop, St#stream_acc{workers=Workers1}}
+ end
+ end;
+
+handle_stream_start({ok, ddoc_updated}, _, St) ->
+ cleanup(St#stream_acc.workers),
+ {stop, ddoc_updated};
+
+handle_stream_start(Else, _, _) ->
+ exit({invalid_stream_start, Else}).
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index e622c6aa0..cc1f1b622 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -16,7 +16,6 @@
update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
remove_down_workers/2, doc_id_and_rev/1]).
-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
--export([stream_start/2, stream_start/4]).
-export([log_timeout/2, remove_done_workers/2]).
-export([is_users_db/1, is_replicator_db/1, fake_db/2]).
-export([upgrade_mrargs/1]).
@@ -51,93 +50,6 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
cleanup(Workers) ->
[rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
-stream_start(Workers, Keypos) ->
- stream_start(Workers, Keypos, undefined, undefined).
-
-stream_start(Workers0, Keypos, StartFun, Replacements) ->
- Fun = fun handle_stream_start/3,
- Acc = #stream_acc{
- workers = fabric_dict:init(Workers0, waiting),
- start_fun = StartFun,
- replacements = Replacements
- },
- Timeout = request_timeout(),
- case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
- {ok, #stream_acc{workers=Workers}} ->
- true = fabric_view:is_progress_possible(Workers),
- AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
- rexi:stream_start(From),
- [Worker | WorkerAcc]
- end, [], Workers),
- {ok, AckedWorkers};
- Else ->
- Else
- end.
-
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
- case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
- {ok, Workers} ->
- {ok, St#stream_acc{workers=Workers}};
- error ->
- Reason = {nodedown, <<"progress not possible">>},
- {error, Reason}
- end;
-handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
- Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
- Replacements = St#stream_acc.replacements,
- case {fabric_view:is_progress_possible(Workers), Reason} of
- {true, _} ->
- {ok, St#stream_acc{workers=Workers}};
- {false, {maintenance_mode, _Node}} when Replacements /= undefined ->
- % Check if we have replacements for this range
- % and start the new workers if so.
- case lists:keytake(Worker#shard.range, 1, Replacements) of
- {value, {_Range, WorkerReplacements}, NewReplacements} ->
- FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
- NewWorker = (St#stream_acc.start_fun)(Repl),
- fabric_dict:store(NewWorker, waiting, NewWorkers)
- end, Workers, WorkerReplacements),
- % Assert that our replaced worker provides us
- % the oppurtunity to make progress.
- true = fabric_view:is_progress_possible(FinalWorkers),
- NewRefs = fabric_dict:fetch_keys(FinalWorkers),
- {new_refs, NewRefs, St#stream_acc{
- workers=FinalWorkers,
- replacements=NewReplacements
- }};
- false ->
- % If we progress isn't possible and we don't have any
- % replacements then we're dead in the water.
- Error = {nodedown, <<"progress not possible">>},
- {error, Error}
- end;
- {false, _} ->
- {error, fabric_util:error_info(Reason)}
- end;
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
- case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
- undefined ->
- % This worker lost the race with other partition copies, terminate
- rexi:stream_cancel(From),
- {ok, St};
- waiting ->
- % Don't ack the worker yet so they don't start sending us
- % rows until we're ready
- Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
- Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
- case fabric_dict:any(waiting, Workers1) of
- true ->
- {ok, St#stream_acc{workers=Workers1}};
- false ->
- {stop, St#stream_acc{workers=Workers1}}
- end
- end;
-handle_stream_start({ok, ddoc_updated}, _, St) ->
- cleanup(St#stream_acc.workers),
- {stop, ddoc_updated};
-handle_stream_start(Else, _, _) ->
- exit({invalid_stream_start, Else}).
-
recv(Workers, Keypos, Fun, Acc0) ->
rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index 30c8e8d51..a404125fa 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -26,12 +26,12 @@ go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_util:stream_start(Workers0, #shard.ref) of
+ case fabric_streams:start(Workers0, #shard.ref) of
{ok, Workers} ->
try
go(DbName, Options, Workers, QueryArgs, Callback, Acc)
after
- fabric_util:cleanup(Workers)
+ fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index 7288f1aa5..f96bb058d 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -166,7 +166,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
end,
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
{ok, Workers} ->
try
LiveSeqs = lists:map(fun(W) ->
@@ -178,7 +178,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
Callback, AccIn, Timeout)
after
- fabric_util:cleanup(Workers)
+ fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index b6a3d6f83..ee51bfe74 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -36,14 +36,14 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
{ok, ddoc_updated} ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
go(DbName, Workers, VInfo, Args, Callback, Acc)
after
- fabric_util:cleanup(Workers)
+ fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index a74be1073..b2b8a05f0 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -35,14 +35,14 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
+ case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls) of
{ok, ddoc_updated} ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
go2(DbName, Workers, VInfo, Args, Callback, Acc)
after
- fabric_util:cleanup(Workers)
+ fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(