summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-02-15 16:08:14 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-15 16:42:43 -0500
commit133f1599010b913735b1d8a9939c91b03fa0395c (patch)
tree1925b39f303ba95d1ad432cae454de2500f8c6b9
parent48738222cc8d36c7ed43b07ac79622543590a6da (diff)
downloadcouchdb-133f1599010b913735b1d8a9939c91b03fa0395c.tar.gz
When db is deleted, delete all completed jobs as well
-rw-r--r--src/mem3/src/mem3_reshard.erl80
-rw-r--r--src/mem3/src/mem3_reshard.hrl3
-rw-r--r--src/mem3/src/mem3_reshard_dbdoc.erl2
-rw-r--r--src/mem3/test/mem3_reshard_api_test.erl23
4 files changed, 91 insertions, 17 deletions
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index 2b968d106..7cd510c85 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -170,11 +170,13 @@ init(_) ->
couch_log:notice("~p start init()", [?MODULE]),
EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}],
?MODULE = ets:new(?MODULE, EtsOpts),
+ ManagerPid = self(),
State = #state{
state = running,
state_info = [],
update_time = now_sec(),
- node = node()
+ node = node(),
+ db_monitor = spawn_link(fun() -> db_monitor(ManagerPid) end)
},
State1 = mem3_reshard_store:init(State, ?JOB_PREFIX, state_id()),
State2 = mem3_reshard_store:load_state(State1),
@@ -184,6 +186,8 @@ init(_) ->
terminate(Reason, State) ->
couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]),
+ catch unlink(State#state.db_monitor),
+ catch exit(State#state.db_monitor, kill),
[kill_job_int(Job) || Job <- running_jobs()],
ok.
@@ -279,19 +283,8 @@ handle_call({stop_job, Id, Reason}, _From, State) ->
{reply, {error, not_found}, State}
end;
-
handle_call({remove_job, Id}, _From, State) ->
- couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
- case job_by_id(Id) of
- #job{} = Job ->
- ok = stop_job_int(Job, stopped, "Removed by user", State),
- ok = mem3_reshard_store:delete_job(State, Id),
- ets:delete(?MODULE, Job#job.id),
- {reply, ok, State};
- not_found ->
- {reply, {error, not_found}, State}
- end;
-
+ {reply, remove_job_int(Id, State), State};
handle_call(get_state, _From, #state{state = GlobalState} = State) ->
StateProps = mem3_reshard_store:state_to_ejson_props(State),
@@ -319,6 +312,11 @@ handle_call(Call, From, State) ->
couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
{noreply, State}.
+handle_cast({db_deleted, JobIds}, State) ->
+ LogMsg = "~p removing ~p jobs after db was deleted",
+ couch_log:notice(LogMsg, [?MODULE, length(JobIds)]),
+ [remove_job_int(JobId, State) || JobId <- JobIds],
+ {noreply, State};
handle_cast({report, Job}, State) ->
report_int(Job),
@@ -686,6 +684,20 @@ report_int(Job) ->
end.
+-spec remove_job_int(#job{}, #state{}) -> ok | {error, not_found}.
+remove_job_int(Id, State) ->
+ couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
+ case job_by_id(Id) of
+ #job{} = Job ->
+ kill_job_int(Job),
+ ok = mem3_reshard_store:delete_job(State, Id),
+ ets:delete(?MODULE, Job#job.id),
+ ok;
+ not_found ->
+ {error, not_found}
+ end.
+
+
% This function is for testing and debugging only
-spec reset_state(#state{}) -> #state{}.
reset_state(#state{} = State) ->
@@ -719,7 +731,6 @@ update_job_history(#job{job_state = St, update_time = Ts} = Job) ->
Job#job{history = update_history(St, Reason, Ts, Hist)}.
-
-spec update_history_rev(atom(), binary() | null, time_sec(), list()) -> list().
update_history(State, State, Ts, History) ->
% State is the same as detail. Make the detail null to avoid duplication
@@ -754,6 +765,47 @@ max_history() ->
config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY).
+
+-spec completed_jobs_by_dbname(binary()) -> [#job{}].
+completed_jobs_by_dbname(Name) ->
+ DbName = mem3:dbname(Name),
+ Pat = #job{
+ job_state = completed,
+ source = #shard{dbname = DbName, _ = '_'},
+ _ = '_'
+ },
+ [JobId || #job{id = JobId} <- ets:match_object(?MODULE, Pat)].
+
+
+-spec db_monitor(pid()) -> no_return().
+db_monitor(Server) ->
+ couch_log:notice("~p db monitor ~p starting", [?MODULE, self()]),
+ EvtRef = erlang:monitor(process, couch_event_server),
+ couch_event:register_all(self()),
+ db_monitor_loop(Server, EvtRef).
+
+
+-spec db_monitor_loop(pid(), reference()) -> no_return().
+db_monitor_loop(Server, EvtRef) ->
+ receive
+ {'$couch_event', DbName, deleted} ->
+ case completed_jobs_by_dbname(DbName) of
+ [] ->
+ ok;
+ JobIds ->
+ gen_server:cast(Server, {db_deleted, JobIds})
+ end,
+ db_monitor_loop(Server, EvtRef);
+ {'$couch_event', _, _} ->
+ db_monitor_loop(Server, EvtRef);
+ {'DOWN', EvtRef, _, _, Info} ->
+ couch_log:error("~p db monitor listener died ~p", [?MODULE, Info]),
+ exit({db_monitor_died, Info});
+ Msg ->
+ couch_log:error("~p db monitor unexpected msg ~p", [?MODULE, Msg]),
+ db_monitor_loop(Server, EvtRef)
+ end.
+
-spec statefmt(#state{} | term()) -> string().
statefmt(#state{state = StateName}) ->
Total = ets:info(?MODULE, size),
diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl
index 175377f9b..770dc7459 100644
--- a/src/mem3/src/mem3_reshard.hrl
+++ b/src/mem3/src/mem3_reshard.hrl
@@ -69,7 +69,8 @@
update_time :: non_neg_integer(),
job_prefix :: binary(),
state_id :: binary(),
- node :: node()
+ node :: node(),
+ db_monitor :: pid()
}).
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
index 8dd6f1b62..135f57f0d 100644
--- a/src/mem3/src/mem3_reshard_dbdoc.erl
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -48,7 +48,7 @@ update_shard_map(#job{source = Source, target = Target} = Job) ->
end
catch
_:Err ->
- exit({shard_update_error, Err})
+ exit(Err)
end,
LogMsg2 = "~p : update_shard_map on node returned. ~p",
couch_log:notice(LogMsg2, [?MODULE, Node]),
diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl
index 270c3e2cf..70640f327 100644
--- a/src/mem3/test/mem3_reshard_api_test.erl
+++ b/src/mem3/test/mem3_reshard_api_test.erl
@@ -101,7 +101,8 @@ mem3_reshard_api_test_() ->
fun recover_in_wait_source_close/1,
fun recover_in_topoff3/1,
fun recover_in_source_delete/1,
- fun check_max_jobs/1
+ fun check_max_jobs/1,
+ fun cleanup_completed_jobs/1
]
}
}
@@ -680,6 +681,17 @@ check_max_jobs(Top) ->
end).
+cleanup_completed_jobs(Top) ->
+ ?_test(begin
+ Body = #{type => split, db => <<?DB1>>},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ wait_state(JobUrl ++ "/state", <<"completed">>),
+ delete_db(Top, ?DB1),
+ wait_for_http_code(JobUrl, 404)
+ end).
+
+
% Test help functions
wait_to_complete_then_cleanup(Top, Jobs) ->
@@ -729,6 +741,15 @@ wait_state(Url, State) ->
end, 30000).
+wait_for_http_code(Url, Code) when is_integer(Code) ->
+ test_util:wait(fun() ->
+ case req(get, Url) of
+ {Code, _} -> ok;
+ {_, _} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
delete_source_in_state(Top, Db, State) when is_atom(State) ->
intercept_state(State),
Body = #{type => split, db => list_to_binary(Db)},