diff options
Diffstat (limited to 'src/fabric/src/fabric_doc_missing_revs.erl')
-rw-r--r-- | src/fabric/src/fabric_doc_missing_revs.erl | 81 |
1 files changed, 50 insertions, 31 deletions
diff --git a/src/fabric/src/fabric_doc_missing_revs.erl b/src/fabric/src/fabric_doc_missing_revs.erl index 993c21dc2..ffd408f4e 100644 --- a/src/fabric/src/fabric_doc_missing_revs.erl +++ b/src/fabric/src/fabric_doc_missing_revs.erl @@ -23,48 +23,57 @@ go(DbName, AllIdsRevs) -> go(_, [], _) -> {ok, []}; go(DbName, AllIdsRevs, Options) -> - Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> - Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, - Options]}), - Shard#shard{ref=Ref} - end, group_idrevs_by_shard(DbName, AllIdsRevs)), - ResultDict = dict:from_list([{Id, {{nil,Revs},[]}} || {Id, Revs} <- AllIdsRevs]), + Workers = lists:map( + fun({#shard{name = Name, node = Node} = Shard, IdsRevs}) -> + Ref = rexi:cast( + Node, + {fabric_rpc, get_missing_revs, [ + Name, + IdsRevs, + Options + ]} + ), + Shard#shard{ref = Ref} + end, + group_idrevs_by_shard(DbName, AllIdsRevs) + ), + ResultDict = dict:from_list([{Id, {{nil, Revs}, []}} || {Id, Revs} <- AllIdsRevs]), RexiMon = fabric_util:create_monitors(Workers), Acc0 = {length(Workers), ResultDict, Workers}, try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {timeout, {_, _, DefunctWorkers}} -> - fabric_util:log_timeout( - DefunctWorkers, - "get_missing_revs" - ), - {error, timeout}; - Else -> - Else + {timeout, {_, _, DefunctWorkers}} -> + fabric_util:log_timeout( + DefunctWorkers, + "get_missing_revs" + ), + {error, timeout}; + Else -> + Else after rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {_WorkerLen, ResultDict, Workers}) -> - NewWorkers = [W || #shard{node=Node} = W <- Workers, Node =/= NodeRef], +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {_WorkerLen, ResultDict, Workers}) -> + NewWorkers = [W || #shard{node = Node} = W <- Workers, Node =/= NodeRef], skip_message({fabric_dict:size(NewWorkers), ResultDict, NewWorkers}); handle_message({rexi_EXIT, _}, Worker, {W, D, Workers}) -> - skip_message({W-1,D,lists:delete(Worker, Workers)}); + skip_message({W - 1, D, lists:delete(Worker, Workers)}); handle_message({ok, Results}, _Worker, {1, D0, _}) -> D = update_dict(D0, Results), {stop, dict:fold(fun force_reply/3, [], D)}; handle_message({ok, Results}, Worker, {WaitingCount, D0, Workers}) -> D = update_dict(D0, Results), case dict:fold(fun maybe_reply/3, {stop, []}, D) of - continue -> - % still haven't heard about some Ids - {ok, {WaitingCount - 1, D, lists:delete(Worker,Workers)}}; - {stop, FinalReply} -> - % finished, stop the rest of the jobs - fabric_util:cleanup(lists:delete(Worker,Workers)), - {stop, FinalReply} + continue -> + % still haven't heard about some Ids + {ok, {WaitingCount - 1, D, lists:delete(Worker, Workers)}}; + {stop, FinalReply} -> + % finished, stop the rest of the jobs + fabric_util:cleanup(lists:delete(Worker, Workers)), + {stop, FinalReply} end. -force_reply(Id, {{nil,Revs}, Anc}, Acc) -> +force_reply(Id, {{nil, Revs}, Anc}, Acc) -> % never heard about this ID, assume it's missing [{Id, Revs, Anc} | Acc]; force_reply(_, {[], _}, Acc) -> @@ -82,14 +91,24 @@ maybe_reply(Id, {Revs, Anc}, {stop, Acc}) -> {stop, [{Id, Revs, Anc} | Acc]}. group_idrevs_by_shard(DbName, IdsRevs) -> - dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, {Id, Revs}, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), IdsRevs)). + dict:to_list( + lists:foldl( + fun({Id, Revs}, D0) -> + lists:foldl( + fun(Shard, D1) -> + dict:append(Shard, {Id, Revs}, D1) + end, + D0, + mem3:shards(DbName, Id) + ) + end, + dict:new(), + IdsRevs + ) + ). update_dict(D0, KVs) -> - lists:foldl(fun({K,V,A}, D1) -> dict:store(K, {V,A}, D1) end, D0, KVs). + lists:foldl(fun({K, V, A}, D1) -> dict:store(K, {V, A}, D1) end, D0, KVs). skip_message({0, Dict, _Workers}) -> {stop, dict:fold(fun force_reply/3, [], Dict)}; |