diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-01-31 13:43:28 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-15 16:42:43 -0500 |
commit | 107331d600a0c2dba342aa823d5dd5d83f5ddc5a (patch) | |
tree | b11278c7d71a8cc8fef5bb4fc7aa51985b7d6dd1 | |
parent | d6b1d634a0b6ffbec78f77bd220123ccfb569361 (diff) | |
download | couchdb-107331d600a0c2dba342aa823d5dd5d83f5ddc5a.tar.gz |
API changes from mailing list feedback
Internally there is support to stop / resume individual jobs. It works along
with the existing feature of disablign / enabling shard splitting globally
on the cluster.
API changes:
* _shard_splits/jobs/$jobid/state GET / PUT support
* _shard_splits/state GET / PUT support, no more PUTs to _shard_splits
* More reasonable summary for _shard_splits GET
```
{
"completed": 1,
"failed": 0,
"running": 0,
"state": "stopped",
"stopped": 0,
"total": 1
}
```
-rw-r--r-- | src/mem3/src/mem3_shard_split.erl | 123 | ||||
-rw-r--r-- | src/mem3/src/mem3_shard_split_httpd.erl | 205 |
2 files changed, 260 insertions, 68 deletions
diff --git a/src/mem3/src/mem3_shard_split.erl b/src/mem3/src/mem3_shard_split.erl index cedc7d0b6..f99b97855 100644 --- a/src/mem3/src/mem3_shard_split.erl +++ b/src/mem3/src/mem3_shard_split.erl @@ -17,6 +17,8 @@ -export([ start_job/1, remove_job/1, + stop_job/2, + resume_job/1, jobs/0, job/1, shard_from_name/1, @@ -84,7 +86,17 @@ start_job(#shard{} = Source, Split) -> end. --spec remove_job(binary()) -> ok | not_found. +-spec stop_job(binary(), binary()) -> ok | {error, any()}. +stop_job(JobId, Reason) when is_binary(JobId), is_binary(Reason) -> + gen_server:call(?MODULE, {stop_job, JobId, Reason}, infinity). + + +-spec resume_job(binary()) -> ok | {error, any()}. +resume_job(JobId) when is_binary(JobId) -> + gen_server:call(?MODULE, {resume_job, JobId}, infinity). + + +-spec remove_job(binary()) -> ok | {error, not_found}. remove_job(JobId) when is_binary(JobId) -> gen_server:call(?MODULE, {remove_job, JobId}, infinity). @@ -150,7 +162,7 @@ init(_) -> ?MODULE = ets:new(?MODULE, EtsOpts), State = #state{ state = running, - state_info = [{reason, <<"Started">>}], + state_info = [], time_updated = now_sec(), node = node(), db_monitor = spawn_link(?MODULE, db_monitor, [self()]) @@ -165,7 +177,7 @@ terminate(Reason, State) -> couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]), catch unlink(State#state.db_monitor), catch exit(State#state.db_monitor, kill), - stop_jobs(<<"Shard splitting stopped">>, State), + [kill_job_int(Job, State) || Job <- running_jobs()], ok. @@ -189,7 +201,7 @@ handle_call({stop, Reason}, _From, #state{state = running} = State) -> state_info = info_update(reason, Reason, State#state.state_info) }, ok = mem3_shard_split_store:store_state(State1), - ok = stop_jobs(<<"Shard splitting disabled">>, State1), + [kill_job_int(Job, State1) || Job <- running_jobs()], {reply, ok, State1}; handle_call({stop, _}, _From, State) -> @@ -224,6 +236,43 @@ handle_call({start_job, #job{id = Id, source = Source} = Job}, _From, State) -> {reply, SourceError, State} end; +handle_call({resume_job, _}, _From, #state{state = stopped} = State) -> + case couch_util:get_value(reason, State#state.state_info) of + undefined -> + {reply, {error, stopped}, State}; + Reason -> + {reply, {error, {stopped, Reason}}, State} + end; + +handle_call({resume_job, Id}, _From, State) -> + couch_log:notice("~p resume_job call ~p", [?MODULE, Id]), + case job_by_id(Id) of + #job{job_state = stopped} = Job -> + case start_job_int(Job, State) of + ok -> + {reply, ok, State}; + {error, Error} -> + {reply, {error, Error}, State} + end; + #job{} -> + {reply, ok, State}; + not_found -> + {reply, {error, not_found}, State} + end; + +handle_call({stop_job, Id, Reason}, _From, State) -> + couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]), + case job_by_id(Id) of + #job{job_state = running} = Job -> + ok = stop_job_int(Job, stopped, Reason, State), + {reply, ok, State}; + #job{} -> + {reply, ok, State}; + not_found -> + {reply, {error, not_found}, State} + end; + + handle_call({remove_job, Id}, _From, State) -> couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]), case job_by_id(Id) of @@ -243,15 +292,24 @@ handle_call({report, Job}, {FromPid, _}, State) -> report_int(Job, FromPid), {reply, ok, State}; -handle_call(get_state, _From, #state{} = State) -> +handle_call(get_state, _From, #state{state = GlobalState} = State) -> StateProps = mem3_shard_split_store:state_to_ejson_props(State), + Stats0 = #{running => 0, completed => 0, failed => 0, stopped => 0}, + StateStats = ets:foldl(fun(#job{job_state = State}, Acc) -> + % When jobs are disabled globally their state is not checkpointed as + % "stopped", but it stays as "running". But when returning the state we + % don't want to mislead and indicate that there are "N running jobs" + % when the global state is "stopped". + State1 = case GlobalState =:= stopped andalso State =:= running of + true -> stopped; + false -> State + end, + Acc#{State => maps:get(State1, Acc, 0) + 1} + end, #{}, ?MODULE), Total = ets:info(?MODULE, size), - Running = mem3_shard_split_job_sup:count_children(), - StateProps1 = StateProps ++ [ - {jobs_total, Total}, - {jobs_running, Running} - ], - {reply, {StateProps1}, State}; + StateStats1 = maps:to_list(StateStats) ++ [{total, Total}], + Result = {lists:sort(StateProps ++ StateStats1)}, + {reply, Result, State}; handle_call(reset_state, _From, State) -> {reply, ok, reset_state(State)}; @@ -308,7 +366,7 @@ reload_jobs(State) -> lists:foldl(fun reload_job/2, State, Jobs). -% This is a case when main application is stopped but a job is reload that was +% This is a case when main application is stopped but a job is reloaded that was % checkpointed in running state. Set that state to stopped to avoid the API % results looking odd. -spec reload_job(#job{}, #state{}) -> #state{}. @@ -325,9 +383,8 @@ reload_job(#job{job_state = running} = Job, #state{state = stopped} = State) -> true = ets:insert(?MODULE, Job1), State; -% This is a case when a job process is spawned. -reload_job(#job{job_state = JSt} = Job, #state{state = running} = State) - when JSt =:= running orelse JSt =:= stopped -> +% This is a case when a job process should be spawend +reload_job(#job{job_state = running} = Job, #state{state = running} = State) -> case start_job_int(Job, State) of ok -> State; @@ -337,7 +394,8 @@ reload_job(#job{job_state = JSt} = Job, #state{state = running} = State) State end; -% The default case is to just load the job into the ets table. +% The default case is to just load the job into the ets table. This would be +% failed or completed jobs for example reload_job(#job{} = Job, #state{} = State) -> true = ets:insert(?MODULE, Job), State. @@ -384,34 +442,41 @@ spawn_job(#job{} = Job0) -> end. --spec stop_jobs(term(), #state{}) -> ok. -stop_jobs(Reason, State) -> - couch_log:notice("~p stop_jobs reason:~p", [?MODULE, Reason]), - [stop_job_int(Job, stopped, Reason, State) || Job <- running_jobs()], - ok. - - -spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok. stop_job_int(#job{pid = undefined}, _JobState, _Reason, _State) -> ok; stop_job_int(#job{} = Job, JobState, Reason, State) -> couch_log:info("~p stop_job_int ~p : ~p", [?MODULE, jobfmt(Job), Reason]), - ok = mem3_shard_split_job_sup:terminate_child(Job#job.pid), - demonitor(Job#job.ref, [flush]), - Job1 = Job#job{ - pid = undefined, - ref = undefined, + Job1 = kill_job_int(Job, State), + Job2 = Job1#job{ job_state = JobState, time_updated = now_sec(), state_info = [{reason, Reason}] }, ok = mem3_shard_split_store:store_job(State, Job1), - true = ets:insert(?MODULE, Job1), couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job1)]), ok. +-spec kill_job_int(#job{}, #state{}) -> #job{}. +kill_job_int(#job{pid = undefined} = Job, State) -> + Job; + +kill_job_int(#job{pid = Pid, ref = Ref} = Job, State) -> + couch_log:info("~p kill_job_int ~p", [?MODULE, jobfmt(Job)]), + case erlang:is_process_alive(Pid) of + true -> + ok = mem3_shard_split_job_sup:terminate_child(Pid); + false -> + ok + end, + demonitor(Ref, [flush]), + Job1 = Job#job{pid = undefined, ref = undefined}, + true = ets:insert(?MODULE, Job1), + Job1. + + -spec handle_job_exit(#job{}, term(), #state{}) -> ok. handle_job_exit(#job{split_state = completed} = Job, normal, State) -> couch_log:notice("~p completed job ~s exited", [?MODULE, Job#job.id]), diff --git a/src/mem3/src/mem3_shard_split_httpd.erl b/src/mem3/src/mem3_shard_split_httpd.erl index 0d1b6dea3..3d23cb4c0 100644 --- a/src/mem3/src/mem3_shard_split_httpd.erl +++ b/src/mem3/src/mem3_shard_split_httpd.erl @@ -29,53 +29,77 @@ -include_lib("couch/include/couch_db.hrl"). + +-define(JOBS, <<"jobs">>). +-define(STATE, <<"state">>). +-define(S_RUNNING, <<"running">>). +-define(S_STOPPED, <<"stopped">>). + % GET /_shard_splits % handle_shard_splits_req(#httpd{method='GET', path_parts=[_]} = Req) -> - State = get_state(), + State = get_summary(), send_json(Req, State); -% PUT /_shard_splits {"start": true} | {"stop":"Reason..."} +handle_shard_splits_req(#httpd{path_parts=[_]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD"); + + +% GET /_shard_splits/state +% +handle_shard_splits_req(#httpd{method='GET', + path_parts=[_, ?STATE]} = Req) -> + State = get_shard_splitting_state(), + send_json(Req, {[{state, State}]}); + +% PUT /_shard_splits/state % -handle_shard_splits_req(#httpd{method='PUT', path_parts=[_]} = Req) -> +handle_shard_splits_req(#httpd{method='PUT', + path_parts=[_, ?STATE]} = Req) -> couch_httpd:validate_ctype(Req, "application/json"), {Props} = couch_httpd:json_body_obj(Req), - Start = couch_util:get_value(<<"start">>, Props), - Stop = couch_util:get_value(<<"stop">>, Props), - case {Start, Stop} of - {undefined, undefined} -> - throw({bad_request, <<"Expected a `start` or `stop` field">>}); - {Start, undefined} when Start =/= undefined -> + State = couch_util:get_value(<<"state">>, Props), + Reason = couch_util:get_value(<<"reason">>, Props), + case {State, Reason} of + {undefined, _} -> + throw({bad_request, <<"Expected a `state` field">>}); + {?S_RUNNING, _} -> case start_shard_splitting() of {ok, JsonResult} -> send_json(Req, 200, JsonResult); {error, JsonError} -> send_json(Req, 500, JsonError) end; - {undefined, Reason} when is_binary(Reason) orelse Reason =:= true -> - case stop_shard_splitting(Reason) of + {?S_STOPPED, Reason} -> + Reason1 = case Reason =:= undefined of false -> Reason; true -> + <<"Shard splitting stopped on the cluster by user">> + end, + case stop_shard_splitting(Reason1) of {ok, JsonResult} -> send_json(Req, 200, JsonResult); {error, JsonError} -> send_json(Req, 500, JsonError) end; {_, _} -> - throw({bad_request, <<"Invalid `start` or `stop` fields">>}) - end; + throw({bad_request, <<"State field not `running` or `stopped`">>}) + end; -handle_shard_splits_req(#httpd{path_parts=[_]} = Req) -> + +handle_shard_splits_req(#httpd{path_parts=[_, ?STATE]} = Req) -> send_method_not_allowed(Req, "GET,HEAD,PUT"); + % GET /_shard_splits/jobs % -handle_shard_splits_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) -> +handle_shard_splits_req(#httpd{method='GET', path_parts=[_,?JOBS]}=Req) -> Jobs = get_jobs(), Total = length(Jobs), send_json(Req, {[{total_rows, Total}, {offset, 0}, {jobs, Jobs}]}); % POST /_shard_splits/jobs {"node": "...", "shard": "..."} % -handle_shard_splits_req(#httpd{method = 'POST', path_parts=[_,<<"jobs">>]} = Req) -> +handle_shard_splits_req(#httpd{method = 'POST', + path_parts=[_, ?JOBS]} = Req) -> couch_httpd:validate_ctype(Req, "application/json"), JobProps = couch_httpd:json_body_obj(Req), Node = get_jobs_post_node(JobProps), @@ -93,12 +117,14 @@ handle_shard_splits_req(#httpd{method = 'POST', path_parts=[_,<<"jobs">>]} = Req send_json(Req, 500, {[{error, to_binary(Error)}]}) end; -handle_shard_splits_req(#httpd{path_parts=[_,<<"jobs">>]} = Req) -> +handle_shard_splits_req(#httpd{path_parts=[_, ?JOBS]} = Req) -> send_method_not_allowed(Req, "GET,HEAD,POST"); -% GET /_shard_splits/jobs/<jobid> + +% GET /_shard_splits/jobs/$jobid % -handle_shard_splits_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) -> +handle_shard_splits_req(#httpd{method='GET', + path_parts=[_, ?JOBS, JobId]}=Req) -> case get_job(JobId) of {ok, JobInfo} -> send_json(Req, JobInfo); @@ -106,9 +132,10 @@ handle_shard_splits_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Re throw(not_found) end; -% DELETE /_shard_splits/jobs/<jobid> +% DELETE /_shard_splits/jobs/$jobid % -handle_shard_splits_req(#httpd{method='DELETE', path_parts=[_,<<"jobs">>,JobId]}=Req) -> +handle_shard_splits_req(#httpd{method='DELETE', + path_parts=[_, ?JOBS, JobId]}=Req) -> case get_job(JobId) of {ok, {Props}} -> NodeBin = couch_util:get_value(node, Props), @@ -116,7 +143,7 @@ handle_shard_splits_req(#httpd{method='DELETE', path_parts=[_,<<"jobs">>,JobId]} case rpc:call(Node, mem3_shard_split, remove_job, [JobId]) of ok -> send_json(Req, 200, {[{ok, true}]}); - not_found -> + {error, not_found} -> throw(not_found) end; {error, not_found} -> @@ -124,8 +151,62 @@ handle_shard_splits_req(#httpd{method='DELETE', path_parts=[_,<<"jobs">>,JobId]} end; -handle_shard_splits_req(#httpd{path_parts=[_,<<"jobs">>,_]} = Req) -> - send_method_not_allowed(Req, "GET,HEAD,DELETE"). +handle_shard_splits_req(#httpd{path_parts=[_, ?JOBS, _]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD,DELETE"); + + +% GET /_shard_splits/jobs/$jobid/state +% + +handle_shard_splits_req(#httpd{method='GET', + path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) -> + case get_job(JobId) of + {ok, {Props}} -> + JobState = couch_util:get_value(job_state, Props), + {InfoProps} = couch_util:get_value(state_info, Props, {[]}), + send_json(Req, 200, {[{state, JobState}]}); + {error, not_found} -> + throw(not_found) + end; + +% PUT /_shard_splits/jobs/$jobid/state + +handle_shard_splits_req(#httpd{method='PUT', + path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) -> + couch_httpd:validate_ctype(Req, "application/json"), + {Props} = couch_httpd:json_body_obj(Req), + State = couch_util:get_value(<<"state">>, Props), + Reason = couch_util:get_value(<<"reason">>, Props), + case {State, Reason} of + {undefined, _} -> + throw({bad_request, <<"Expected a `state` field">>}); + {?S_RUNNING, _} -> + case resume_job(JobId) of + ok -> + send_json(Req, 200, {[{ok, true}]}); + {error, not_found} -> + throw(not_found); + {error, JsonError} -> + send_json(Req, 500, JsonError) + end; + {?S_STOPPED, Reason} -> + Reason1 = case Reason =:= undefined of false -> Reason; true -> + <<"Stopped by user">> + end, + case stop_job(JobId, Reason1) of + ok -> + send_json(Req, 200, {[{ok, true}]}); + {error, not_found} -> + throw(not_found); + {error, JsonError} -> + send_json(Req, 500, JsonError) + end; + {_, _} -> + throw({bad_request, <<"State field not `running` or `stopped`">>}) + end; + +handle_shard_splits_req(#httpd{path_parts=[_, ?JOBS ,_, ?STATE ]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD,PUT"). % Private helper functions @@ -175,23 +256,69 @@ get_job(JobId) -> end. -get_state() -> +resume_job(JobId) -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, resume_job, + [JobId]), + WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}], + case lists:usort(WithoutNotFound) of + [ok] -> + ok; + [{error, Error} | _] -> + {error, {[{error, to_binary(Error)}]}}; + [] -> + {error, not_found} + end. + + +stop_job(JobId, Reason) -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, stop_job, + [JobId, Reason]), + WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}], + case lists:usort(WithoutNotFound) of + [ok] -> + ok; + [{error, Error} | _] -> + {error, {[{error, to_binary(Error)}]}}; + [] -> + {error, not_found} + end. + + +get_summary() -> Nodes = mem3_util:live_nodes(), {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, get_state, []), - Acc = lists:foldl(fun({Res}, {RAcc, TAcc, States}) -> - Running = couch_util:get_value(jobs_running, Res, 0), - Total = couch_util:get_value(jobs_total, Res, 0), - Node = couch_util:get_value(node, Res), - State = couch_util:get_value(state, Res), - States1 = orddict:append(State, Node, States), - {Running + RAcc, Total + TAcc, States1} - end, {0, 0, orddict:new()}, Replies), - {Running, Total, States} = Acc, - {[ - {jobs_running, Running}, - {job_total, Total}, - {states, {orddict:to_list(States)}} - ]}. + Stats0 = #{running => 0, total => 0, completed => 0, failed => 0, + stopped => 0}, + {Stats, States} = lists:foldl(fun({Res}, {Stats, States}) -> + Stats1 = maps:map(fun(Stat, OldVal) -> + OldVal + couch_util:get_value(Stat, Res, 0) + end, Stats), + NewStates = [couch_util:get_value(state, Res) | States], + {Stats1, NewStates} + end, {Stats0, []}, Replies), + State = case lists:member(?S_RUNNING, States) of + true -> running; + false -> stopped + end, + {[{state, State}] ++ lists:sort(maps:to_list(Stats))}. + + +get_shard_splitting_state() -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, get_state, []), + Running = lists:foldl(fun({Res}, R) -> + case couch_util:get_value(state, Res) of + ?S_RUNNING -> R + 1; + _ -> R + end + end, 0, Replies), + % If at least one node is "running", then overall state is "running" + case Running > 0 of + true -> running; + false -> stopped + end. start_shard_splitting() -> |