diff options
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 179435965..f6997860d 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -45,10 +45,14 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +-define(DEFAULT_CONCURRENCY, 10). +-define(DEFAULT_BATCH_SIZE, 500). +-define(DEFAULT_BATCH_COUNT, 5). + -record(state, { active = [], count = 0, - limit, + limit = ?DEFAULT_CONCURRENCY, dict = dict:new(), waiting = queue:new() }). @@ -87,10 +91,10 @@ remove_shard(Shard) -> init([]) -> process_flag(trap_exit, true), - Concurrency = config:get("mem3", "sync_concurrency", "10"), + Concurrency = config:get_integer("mem3", "sync_concurrency", ?DEFAULT_CONCURRENCY), gen_event:add_handler(mem3_events, mem3_sync_event, []), initial_sync(), - {ok, #state{limit = list_to_integer(Concurrency)}}. + {ok, #state{limit = Concurrency}}. handle_call({push, Job}, From, State) -> handle_cast({push, Job#job{pid = From}}, State); @@ -236,7 +240,10 @@ start_push_replication(#job{name = Name, node = Node, pid = From}) -> true -> ok end, spawn_link(fun() -> - case mem3_rep:go(Name, maybe_redirect(Node)) of + BatchSize = config:get_integer("mem3", "sync_batch_size", ?DEFAULT_BATCH_SIZE), + BatchCount = config:get_integer("mem3", "sync_batch_count", ?DEFAULT_BATCH_COUNT), + Opts = [{batch_size, BatchSize}, {batch_count, BatchCount}], + case mem3_rep:go(Name, maybe_redirect(Node), Opts) of {ok, Pending} when Pending > 0 -> exit({pending_changes, Pending}); _ -> |