diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-15 16:08:14 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-15 16:42:43 -0500 |
commit | 133f1599010b913735b1d8a9939c91b03fa0395c (patch) | |
tree | 1925b39f303ba95d1ad432cae454de2500f8c6b9 | |
parent | 48738222cc8d36c7ed43b07ac79622543590a6da (diff) | |
download | couchdb-133f1599010b913735b1d8a9939c91b03fa0395c.tar.gz |
When db is deleted, delete all completed jobs as well
-rw-r--r-- | src/mem3/src/mem3_reshard.erl | 80 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard.hrl | 3 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_dbdoc.erl | 2 | ||||
-rw-r--r-- | src/mem3/test/mem3_reshard_api_test.erl | 23 |
4 files changed, 91 insertions, 17 deletions
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl index 2b968d106..7cd510c85 100644 --- a/src/mem3/src/mem3_reshard.erl +++ b/src/mem3/src/mem3_reshard.erl @@ -170,11 +170,13 @@ init(_) -> couch_log:notice("~p start init()", [?MODULE]), EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}], ?MODULE = ets:new(?MODULE, EtsOpts), + ManagerPid = self(), State = #state{ state = running, state_info = [], update_time = now_sec(), - node = node() + node = node(), + db_monitor = spawn_link(fun() -> db_monitor(ManagerPid) end) }, State1 = mem3_reshard_store:init(State, ?JOB_PREFIX, state_id()), State2 = mem3_reshard_store:load_state(State1), @@ -184,6 +186,8 @@ init(_) -> terminate(Reason, State) -> couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]), + catch unlink(State#state.db_monitor), + catch exit(State#state.db_monitor, kill), [kill_job_int(Job) || Job <- running_jobs()], ok. @@ -279,19 +283,8 @@ handle_call({stop_job, Id, Reason}, _From, State) -> {reply, {error, not_found}, State} end; - handle_call({remove_job, Id}, _From, State) -> - couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]), - case job_by_id(Id) of - #job{} = Job -> - ok = stop_job_int(Job, stopped, "Removed by user", State), - ok = mem3_reshard_store:delete_job(State, Id), - ets:delete(?MODULE, Job#job.id), - {reply, ok, State}; - not_found -> - {reply, {error, not_found}, State} - end; - + {reply, remove_job_int(Id, State), State}; handle_call(get_state, _From, #state{state = GlobalState} = State) -> StateProps = mem3_reshard_store:state_to_ejson_props(State), @@ -319,6 +312,11 @@ handle_call(Call, From, State) -> couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]), {noreply, State}. +handle_cast({db_deleted, JobIds}, State) -> + LogMsg = "~p removing ~p jobs after db was deleted", + couch_log:notice(LogMsg, [?MODULE, length(JobIds)]), + [remove_job_int(JobId, State) || JobId <- JobIds], + {noreply, State}; handle_cast({report, Job}, State) -> report_int(Job), @@ -686,6 +684,20 @@ report_int(Job) -> end. +-spec remove_job_int(#job{}, #state{}) -> ok | {error, not_found}. +remove_job_int(Id, State) -> + couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]), + case job_by_id(Id) of + #job{} = Job -> + kill_job_int(Job), + ok = mem3_reshard_store:delete_job(State, Id), + ets:delete(?MODULE, Job#job.id), + ok; + not_found -> + {error, not_found} + end. + + % This function is for testing and debugging only -spec reset_state(#state{}) -> #state{}. reset_state(#state{} = State) -> @@ -719,7 +731,6 @@ update_job_history(#job{job_state = St, update_time = Ts} = Job) -> Job#job{history = update_history(St, Reason, Ts, Hist)}. - -spec update_history_rev(atom(), binary() | null, time_sec(), list()) -> list(). update_history(State, State, Ts, History) -> % State is the same as detail. Make the detail null to avoid duplication @@ -754,6 +765,47 @@ max_history() -> config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY). + +-spec completed_jobs_by_dbname(binary()) -> [#job{}]. +completed_jobs_by_dbname(Name) -> + DbName = mem3:dbname(Name), + Pat = #job{ + job_state = completed, + source = #shard{dbname = DbName, _ = '_'}, + _ = '_' + }, + [JobId || #job{id = JobId} <- ets:match_object(?MODULE, Pat)]. + + +-spec db_monitor(pid()) -> no_return(). +db_monitor(Server) -> + couch_log:notice("~p db monitor ~p starting", [?MODULE, self()]), + EvtRef = erlang:monitor(process, couch_event_server), + couch_event:register_all(self()), + db_monitor_loop(Server, EvtRef). + + +-spec db_monitor_loop(pid(), reference()) -> no_return(). +db_monitor_loop(Server, EvtRef) -> + receive + {'$couch_event', DbName, deleted} -> + case completed_jobs_by_dbname(DbName) of + [] -> + ok; + JobIds -> + gen_server:cast(Server, {db_deleted, JobIds}) + end, + db_monitor_loop(Server, EvtRef); + {'$couch_event', _, _} -> + db_monitor_loop(Server, EvtRef); + {'DOWN', EvtRef, _, _, Info} -> + couch_log:error("~p db monitor listener died ~p", [?MODULE, Info]), + exit({db_monitor_died, Info}); + Msg -> + couch_log:error("~p db monitor unexpected msg ~p", [?MODULE, Msg]), + db_monitor_loop(Server, EvtRef) + end. + -spec statefmt(#state{} | term()) -> string(). statefmt(#state{state = StateName}) -> Total = ets:info(?MODULE, size), diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl index 175377f9b..770dc7459 100644 --- a/src/mem3/src/mem3_reshard.hrl +++ b/src/mem3/src/mem3_reshard.hrl @@ -69,7 +69,8 @@ update_time :: non_neg_integer(), job_prefix :: binary(), state_id :: binary(), - node :: node() + node :: node(), + db_monitor :: pid() }). diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl index 8dd6f1b62..135f57f0d 100644 --- a/src/mem3/src/mem3_reshard_dbdoc.erl +++ b/src/mem3/src/mem3_reshard_dbdoc.erl @@ -48,7 +48,7 @@ update_shard_map(#job{source = Source, target = Target} = Job) -> end catch _:Err -> - exit({shard_update_error, Err}) + exit(Err) end, LogMsg2 = "~p : update_shard_map on node returned. ~p", couch_log:notice(LogMsg2, [?MODULE, Node]), diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl index 270c3e2cf..70640f327 100644 --- a/src/mem3/test/mem3_reshard_api_test.erl +++ b/src/mem3/test/mem3_reshard_api_test.erl @@ -101,7 +101,8 @@ mem3_reshard_api_test_() -> fun recover_in_wait_source_close/1, fun recover_in_topoff3/1, fun recover_in_source_delete/1, - fun check_max_jobs/1 + fun check_max_jobs/1, + fun cleanup_completed_jobs/1 ] } } @@ -680,6 +681,17 @@ check_max_jobs(Top) -> end). +cleanup_completed_jobs(Top) -> + ?_test(begin + Body = #{type => split, db => <<?DB1>>}, + {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), + JobUrl = Top ++ ?JOBS ++ ?b2l(Id), + wait_state(JobUrl ++ "/state", <<"completed">>), + delete_db(Top, ?DB1), + wait_for_http_code(JobUrl, 404) + end). + + % Test help functions wait_to_complete_then_cleanup(Top, Jobs) -> @@ -729,6 +741,15 @@ wait_state(Url, State) -> end, 30000). +wait_for_http_code(Url, Code) when is_integer(Code) -> + test_util:wait(fun() -> + case req(get, Url) of + {Code, _} -> ok; + {_, _} -> timer:sleep(100), wait + end + end, 30000). + + delete_source_in_state(Top, Db, State) when is_atom(State) -> intercept_state(State), Body = #{type => split, db => list_to_binary(Db)}, |