summaryrefslogtreecommitdiff
path: root/src/mem3/src/mem3_sync.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/src/mem3_sync.erl')
-rw-r--r--src/mem3/src/mem3_sync.erl15
1 files changed, 11 insertions, 4 deletions
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 179435965..f6997860d 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -45,10 +45,14 @@
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
+-define(DEFAULT_CONCURRENCY, 10).
+-define(DEFAULT_BATCH_SIZE, 500).
+-define(DEFAULT_BATCH_COUNT, 5).
+
-record(state, {
active = [],
count = 0,
- limit,
+ limit = ?DEFAULT_CONCURRENCY,
dict = dict:new(),
waiting = queue:new()
}).
@@ -87,10 +91,10 @@ remove_shard(Shard) ->
init([]) ->
process_flag(trap_exit, true),
- Concurrency = config:get("mem3", "sync_concurrency", "10"),
+ Concurrency = config:get_integer("mem3", "sync_concurrency", ?DEFAULT_CONCURRENCY),
gen_event:add_handler(mem3_events, mem3_sync_event, []),
initial_sync(),
- {ok, #state{limit = list_to_integer(Concurrency)}}.
+ {ok, #state{limit = Concurrency}}.
handle_call({push, Job}, From, State) ->
handle_cast({push, Job#job{pid = From}}, State);
@@ -236,7 +240,10 @@ start_push_replication(#job{name = Name, node = Node, pid = From}) ->
true -> ok
end,
spawn_link(fun() ->
- case mem3_rep:go(Name, maybe_redirect(Node)) of
+ BatchSize = config:get_integer("mem3", "sync_batch_size", ?DEFAULT_BATCH_SIZE),
+ BatchCount = config:get_integer("mem3", "sync_batch_count", ?DEFAULT_BATCH_COUNT),
+ Opts = [{batch_size, BatchSize}, {batch_count, BatchCount}],
+ case mem3_rep:go(Name, maybe_redirect(Node), Opts) of
{ok, Pending} when Pending > 0 ->
exit({pending_changes, Pending});
_ ->