diff options
-rw-r--r-- | src/mem3/src/mem3.erl | 20 | ||||
-rw-r--r-- | src/mem3/src/mem3_rep.erl | 8 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 11 |
3 files changed, 33 insertions, 6 deletions
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index dc666fdae..6f3a10df8 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -22,6 +22,7 @@ -export([belongs/2, owner/3]). -export([get_placement/1]). -export([ping/1, ping/2]). +-export([db_is_current/1]). %% For mem3 use only. -export([name/1, node/1, range/1, engine/1]). @@ -367,6 +368,25 @@ ping(Node, Timeout) when is_atom(Node) -> pang end. + +db_is_current(#shard{name = Name}) -> + db_is_current(Name); + +db_is_current(<<"shards/", _/binary>> = Name) -> + try + Shards = mem3:shards(mem3:dbname(Name)), + lists:keyfind(Name, #shard.name, Shards) =/= false + catch + error:database_does_not_exist -> + false + end; + +db_is_current(Name) when is_binary(Name) -> + % This accounts for local (non-sharded) dbs, and is mostly + % for unit tests that either test or use mem3_rep logic + couch_server:exists(Name). + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index d5b42d315..d2edd6c4d 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -64,7 +64,13 @@ go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) -> go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts); go(#shard{} = Source, #shard{} = Target, Opts) -> - go(Source, targets_map(Source, Target), Opts); + case mem3:db_is_current(Source) of + true -> + go(Source, targets_map(Source, Target), Opts); + false -> + % Database could have been recreated + {error, missing_source} + end; go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 -> Targets = maps:map(fun(_, T) -> #tgt{shard = T} end, Targets0), diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 693fc4f31..8170f3c1a 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -140,11 +140,12 @@ handle_info({'EXIT', Active, Reason}, State) -> case Reason of {pending_changes, Count} -> maybe_resubmit(State, Job#job{pid = nil, count = Count}); _ -> - try mem3:shards(mem3:dbname(Job#job.name)) of _ -> - timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]) - catch error:database_does_not_exist -> - % no need to retry - ok + case mem3:db_is_current(Job#job.name) of + true -> + timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]); + false -> + % no need to retry (db deleted or recreated) + ok end, State end; |