summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-01-31 13:43:28 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-15 16:42:43 -0500
commit107331d600a0c2dba342aa823d5dd5d83f5ddc5a (patch)
treeb11278c7d71a8cc8fef5bb4fc7aa51985b7d6dd1
parentd6b1d634a0b6ffbec78f77bd220123ccfb569361 (diff)
downloadcouchdb-107331d600a0c2dba342aa823d5dd5d83f5ddc5a.tar.gz
API changes from mailing list feedback
Internally there is support to stop / resume individual jobs. It works along with the existing feature of disablign / enabling shard splitting globally on the cluster. API changes: * _shard_splits/jobs/$jobid/state GET / PUT support * _shard_splits/state GET / PUT support, no more PUTs to _shard_splits * More reasonable summary for _shard_splits GET ``` { "completed": 1, "failed": 0, "running": 0, "state": "stopped", "stopped": 0, "total": 1 } ```
-rw-r--r--src/mem3/src/mem3_shard_split.erl123
-rw-r--r--src/mem3/src/mem3_shard_split_httpd.erl205
2 files changed, 260 insertions, 68 deletions
diff --git a/src/mem3/src/mem3_shard_split.erl b/src/mem3/src/mem3_shard_split.erl
index cedc7d0b6..f99b97855 100644
--- a/src/mem3/src/mem3_shard_split.erl
+++ b/src/mem3/src/mem3_shard_split.erl
@@ -17,6 +17,8 @@
-export([
start_job/1,
remove_job/1,
+ stop_job/2,
+ resume_job/1,
jobs/0,
job/1,
shard_from_name/1,
@@ -84,7 +86,17 @@ start_job(#shard{} = Source, Split) ->
end.
--spec remove_job(binary()) -> ok | not_found.
+-spec stop_job(binary(), binary()) -> ok | {error, any()}.
+stop_job(JobId, Reason) when is_binary(JobId), is_binary(Reason) ->
+ gen_server:call(?MODULE, {stop_job, JobId, Reason}, infinity).
+
+
+-spec resume_job(binary()) -> ok | {error, any()}.
+resume_job(JobId) when is_binary(JobId) ->
+ gen_server:call(?MODULE, {resume_job, JobId}, infinity).
+
+
+-spec remove_job(binary()) -> ok | {error, not_found}.
remove_job(JobId) when is_binary(JobId) ->
gen_server:call(?MODULE, {remove_job, JobId}, infinity).
@@ -150,7 +162,7 @@ init(_) ->
?MODULE = ets:new(?MODULE, EtsOpts),
State = #state{
state = running,
- state_info = [{reason, <<"Started">>}],
+ state_info = [],
time_updated = now_sec(),
node = node(),
db_monitor = spawn_link(?MODULE, db_monitor, [self()])
@@ -165,7 +177,7 @@ terminate(Reason, State) ->
couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]),
catch unlink(State#state.db_monitor),
catch exit(State#state.db_monitor, kill),
- stop_jobs(<<"Shard splitting stopped">>, State),
+ [kill_job_int(Job, State) || Job <- running_jobs()],
ok.
@@ -189,7 +201,7 @@ handle_call({stop, Reason}, _From, #state{state = running} = State) ->
state_info = info_update(reason, Reason, State#state.state_info)
},
ok = mem3_shard_split_store:store_state(State1),
- ok = stop_jobs(<<"Shard splitting disabled">>, State1),
+ [kill_job_int(Job, State1) || Job <- running_jobs()],
{reply, ok, State1};
handle_call({stop, _}, _From, State) ->
@@ -224,6 +236,43 @@ handle_call({start_job, #job{id = Id, source = Source} = Job}, _From, State) ->
{reply, SourceError, State}
end;
+handle_call({resume_job, _}, _From, #state{state = stopped} = State) ->
+ case couch_util:get_value(reason, State#state.state_info) of
+ undefined ->
+ {reply, {error, stopped}, State};
+ Reason ->
+ {reply, {error, {stopped, Reason}}, State}
+ end;
+
+handle_call({resume_job, Id}, _From, State) ->
+ couch_log:notice("~p resume_job call ~p", [?MODULE, Id]),
+ case job_by_id(Id) of
+ #job{job_state = stopped} = Job ->
+ case start_job_int(Job, State) of
+ ok ->
+ {reply, ok, State};
+ {error, Error} ->
+ {reply, {error, Error}, State}
+ end;
+ #job{} ->
+ {reply, ok, State};
+ not_found ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({stop_job, Id, Reason}, _From, State) ->
+ couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]),
+ case job_by_id(Id) of
+ #job{job_state = running} = Job ->
+ ok = stop_job_int(Job, stopped, Reason, State),
+ {reply, ok, State};
+ #job{} ->
+ {reply, ok, State};
+ not_found ->
+ {reply, {error, not_found}, State}
+ end;
+
+
handle_call({remove_job, Id}, _From, State) ->
couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
case job_by_id(Id) of
@@ -243,15 +292,24 @@ handle_call({report, Job}, {FromPid, _}, State) ->
report_int(Job, FromPid),
{reply, ok, State};
-handle_call(get_state, _From, #state{} = State) ->
+handle_call(get_state, _From, #state{state = GlobalState} = State) ->
StateProps = mem3_shard_split_store:state_to_ejson_props(State),
+ Stats0 = #{running => 0, completed => 0, failed => 0, stopped => 0},
+ StateStats = ets:foldl(fun(#job{job_state = State}, Acc) ->
+ % When jobs are disabled globally their state is not checkpointed as
+ % "stopped", but it stays as "running". But when returning the state we
+ % don't want to mislead and indicate that there are "N running jobs"
+ % when the global state is "stopped".
+ State1 = case GlobalState =:= stopped andalso State =:= running of
+ true -> stopped;
+ false -> State
+ end,
+ Acc#{State => maps:get(State1, Acc, 0) + 1}
+ end, #{}, ?MODULE),
Total = ets:info(?MODULE, size),
- Running = mem3_shard_split_job_sup:count_children(),
- StateProps1 = StateProps ++ [
- {jobs_total, Total},
- {jobs_running, Running}
- ],
- {reply, {StateProps1}, State};
+ StateStats1 = maps:to_list(StateStats) ++ [{total, Total}],
+ Result = {lists:sort(StateProps ++ StateStats1)},
+ {reply, Result, State};
handle_call(reset_state, _From, State) ->
{reply, ok, reset_state(State)};
@@ -308,7 +366,7 @@ reload_jobs(State) ->
lists:foldl(fun reload_job/2, State, Jobs).
-% This is a case when main application is stopped but a job is reload that was
+% This is a case when main application is stopped but a job is reloaded that was
% checkpointed in running state. Set that state to stopped to avoid the API
% results looking odd.
-spec reload_job(#job{}, #state{}) -> #state{}.
@@ -325,9 +383,8 @@ reload_job(#job{job_state = running} = Job, #state{state = stopped} = State) ->
true = ets:insert(?MODULE, Job1),
State;
-% This is a case when a job process is spawned.
-reload_job(#job{job_state = JSt} = Job, #state{state = running} = State)
- when JSt =:= running orelse JSt =:= stopped ->
+% This is a case when a job process should be spawend
+reload_job(#job{job_state = running} = Job, #state{state = running} = State) ->
case start_job_int(Job, State) of
ok ->
State;
@@ -337,7 +394,8 @@ reload_job(#job{job_state = JSt} = Job, #state{state = running} = State)
State
end;
-% The default case is to just load the job into the ets table.
+% The default case is to just load the job into the ets table. This would be
+% failed or completed jobs for example
reload_job(#job{} = Job, #state{} = State) ->
true = ets:insert(?MODULE, Job),
State.
@@ -384,34 +442,41 @@ spawn_job(#job{} = Job0) ->
end.
--spec stop_jobs(term(), #state{}) -> ok.
-stop_jobs(Reason, State) ->
- couch_log:notice("~p stop_jobs reason:~p", [?MODULE, Reason]),
- [stop_job_int(Job, stopped, Reason, State) || Job <- running_jobs()],
- ok.
-
-
-spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok.
stop_job_int(#job{pid = undefined}, _JobState, _Reason, _State) ->
ok;
stop_job_int(#job{} = Job, JobState, Reason, State) ->
couch_log:info("~p stop_job_int ~p : ~p", [?MODULE, jobfmt(Job), Reason]),
- ok = mem3_shard_split_job_sup:terminate_child(Job#job.pid),
- demonitor(Job#job.ref, [flush]),
- Job1 = Job#job{
- pid = undefined,
- ref = undefined,
+ Job1 = kill_job_int(Job, State),
+ Job2 = Job1#job{
job_state = JobState,
time_updated = now_sec(),
state_info = [{reason, Reason}]
},
ok = mem3_shard_split_store:store_job(State, Job1),
- true = ets:insert(?MODULE, Job1),
couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job1)]),
ok.
+-spec kill_job_int(#job{}, #state{}) -> #job{}.
+kill_job_int(#job{pid = undefined} = Job, State) ->
+ Job;
+
+kill_job_int(#job{pid = Pid, ref = Ref} = Job, State) ->
+ couch_log:info("~p kill_job_int ~p", [?MODULE, jobfmt(Job)]),
+ case erlang:is_process_alive(Pid) of
+ true ->
+ ok = mem3_shard_split_job_sup:terminate_child(Pid);
+ false ->
+ ok
+ end,
+ demonitor(Ref, [flush]),
+ Job1 = Job#job{pid = undefined, ref = undefined},
+ true = ets:insert(?MODULE, Job1),
+ Job1.
+
+
-spec handle_job_exit(#job{}, term(), #state{}) -> ok.
handle_job_exit(#job{split_state = completed} = Job, normal, State) ->
couch_log:notice("~p completed job ~s exited", [?MODULE, Job#job.id]),
diff --git a/src/mem3/src/mem3_shard_split_httpd.erl b/src/mem3/src/mem3_shard_split_httpd.erl
index 0d1b6dea3..3d23cb4c0 100644
--- a/src/mem3/src/mem3_shard_split_httpd.erl
+++ b/src/mem3/src/mem3_shard_split_httpd.erl
@@ -29,53 +29,77 @@
-include_lib("couch/include/couch_db.hrl").
+
+-define(JOBS, <<"jobs">>).
+-define(STATE, <<"state">>).
+-define(S_RUNNING, <<"running">>).
+-define(S_STOPPED, <<"stopped">>).
+
% GET /_shard_splits
%
handle_shard_splits_req(#httpd{method='GET', path_parts=[_]} = Req) ->
- State = get_state(),
+ State = get_summary(),
send_json(Req, State);
-% PUT /_shard_splits {"start": true} | {"stop":"Reason..."}
+handle_shard_splits_req(#httpd{path_parts=[_]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD");
+
+
+% GET /_shard_splits/state
+%
+handle_shard_splits_req(#httpd{method='GET',
+ path_parts=[_, ?STATE]} = Req) ->
+ State = get_shard_splitting_state(),
+ send_json(Req, {[{state, State}]});
+
+% PUT /_shard_splits/state
%
-handle_shard_splits_req(#httpd{method='PUT', path_parts=[_]} = Req) ->
+handle_shard_splits_req(#httpd{method='PUT',
+ path_parts=[_, ?STATE]} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
{Props} = couch_httpd:json_body_obj(Req),
- Start = couch_util:get_value(<<"start">>, Props),
- Stop = couch_util:get_value(<<"stop">>, Props),
- case {Start, Stop} of
- {undefined, undefined} ->
- throw({bad_request, <<"Expected a `start` or `stop` field">>});
- {Start, undefined} when Start =/= undefined ->
+ State = couch_util:get_value(<<"state">>, Props),
+ Reason = couch_util:get_value(<<"reason">>, Props),
+ case {State, Reason} of
+ {undefined, _} ->
+ throw({bad_request, <<"Expected a `state` field">>});
+ {?S_RUNNING, _} ->
case start_shard_splitting() of
{ok, JsonResult} ->
send_json(Req, 200, JsonResult);
{error, JsonError} ->
send_json(Req, 500, JsonError)
end;
- {undefined, Reason} when is_binary(Reason) orelse Reason =:= true ->
- case stop_shard_splitting(Reason) of
+ {?S_STOPPED, Reason} ->
+ Reason1 = case Reason =:= undefined of false -> Reason; true ->
+ <<"Shard splitting stopped on the cluster by user">>
+ end,
+ case stop_shard_splitting(Reason1) of
{ok, JsonResult} ->
send_json(Req, 200, JsonResult);
{error, JsonError} ->
send_json(Req, 500, JsonError)
end;
{_, _} ->
- throw({bad_request, <<"Invalid `start` or `stop` fields">>})
- end;
+ throw({bad_request, <<"State field not `running` or `stopped`">>})
+ end;
-handle_shard_splits_req(#httpd{path_parts=[_]} = Req) ->
+
+handle_shard_splits_req(#httpd{path_parts=[_, ?STATE]} = Req) ->
send_method_not_allowed(Req, "GET,HEAD,PUT");
+
% GET /_shard_splits/jobs
%
-handle_shard_splits_req(#httpd{method='GET', path_parts=[_,<<"jobs">>]}=Req) ->
+handle_shard_splits_req(#httpd{method='GET', path_parts=[_,?JOBS]}=Req) ->
Jobs = get_jobs(),
Total = length(Jobs),
send_json(Req, {[{total_rows, Total}, {offset, 0}, {jobs, Jobs}]});
% POST /_shard_splits/jobs {"node": "...", "shard": "..."}
%
-handle_shard_splits_req(#httpd{method = 'POST', path_parts=[_,<<"jobs">>]} = Req) ->
+handle_shard_splits_req(#httpd{method = 'POST',
+ path_parts=[_, ?JOBS]} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
JobProps = couch_httpd:json_body_obj(Req),
Node = get_jobs_post_node(JobProps),
@@ -93,12 +117,14 @@ handle_shard_splits_req(#httpd{method = 'POST', path_parts=[_,<<"jobs">>]} = Req
send_json(Req, 500, {[{error, to_binary(Error)}]})
end;
-handle_shard_splits_req(#httpd{path_parts=[_,<<"jobs">>]} = Req) ->
+handle_shard_splits_req(#httpd{path_parts=[_, ?JOBS]} = Req) ->
send_method_not_allowed(Req, "GET,HEAD,POST");
-% GET /_shard_splits/jobs/<jobid>
+
+% GET /_shard_splits/jobs/$jobid
%
-handle_shard_splits_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+handle_shard_splits_req(#httpd{method='GET',
+ path_parts=[_, ?JOBS, JobId]}=Req) ->
case get_job(JobId) of
{ok, JobInfo} ->
send_json(Req, JobInfo);
@@ -106,9 +132,10 @@ handle_shard_splits_req(#httpd{method='GET', path_parts=[_,<<"jobs">>,JobId]}=Re
throw(not_found)
end;
-% DELETE /_shard_splits/jobs/<jobid>
+% DELETE /_shard_splits/jobs/$jobid
%
-handle_shard_splits_req(#httpd{method='DELETE', path_parts=[_,<<"jobs">>,JobId]}=Req) ->
+handle_shard_splits_req(#httpd{method='DELETE',
+ path_parts=[_, ?JOBS, JobId]}=Req) ->
case get_job(JobId) of
{ok, {Props}} ->
NodeBin = couch_util:get_value(node, Props),
@@ -116,7 +143,7 @@ handle_shard_splits_req(#httpd{method='DELETE', path_parts=[_,<<"jobs">>,JobId]}
case rpc:call(Node, mem3_shard_split, remove_job, [JobId]) of
ok ->
send_json(Req, 200, {[{ok, true}]});
- not_found ->
+ {error, not_found} ->
throw(not_found)
end;
{error, not_found} ->
@@ -124,8 +151,62 @@ handle_shard_splits_req(#httpd{method='DELETE', path_parts=[_,<<"jobs">>,JobId]}
end;
-handle_shard_splits_req(#httpd{path_parts=[_,<<"jobs">>,_]} = Req) ->
- send_method_not_allowed(Req, "GET,HEAD,DELETE").
+handle_shard_splits_req(#httpd{path_parts=[_, ?JOBS, _]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD,DELETE");
+
+
+% GET /_shard_splits/jobs/$jobid/state
+%
+
+handle_shard_splits_req(#httpd{method='GET',
+ path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) ->
+ case get_job(JobId) of
+ {ok, {Props}} ->
+ JobState = couch_util:get_value(job_state, Props),
+ {InfoProps} = couch_util:get_value(state_info, Props, {[]}),
+ send_json(Req, 200, {[{state, JobState}]});
+ {error, not_found} ->
+ throw(not_found)
+ end;
+
+% PUT /_shard_splits/jobs/$jobid/state
+
+handle_shard_splits_req(#httpd{method='PUT',
+ path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ {Props} = couch_httpd:json_body_obj(Req),
+ State = couch_util:get_value(<<"state">>, Props),
+ Reason = couch_util:get_value(<<"reason">>, Props),
+ case {State, Reason} of
+ {undefined, _} ->
+ throw({bad_request, <<"Expected a `state` field">>});
+ {?S_RUNNING, _} ->
+ case resume_job(JobId) of
+ ok ->
+ send_json(Req, 200, {[{ok, true}]});
+ {error, not_found} ->
+ throw(not_found);
+ {error, JsonError} ->
+ send_json(Req, 500, JsonError)
+ end;
+ {?S_STOPPED, Reason} ->
+ Reason1 = case Reason =:= undefined of false -> Reason; true ->
+ <<"Stopped by user">>
+ end,
+ case stop_job(JobId, Reason1) of
+ ok ->
+ send_json(Req, 200, {[{ok, true}]});
+ {error, not_found} ->
+ throw(not_found);
+ {error, JsonError} ->
+ send_json(Req, 500, JsonError)
+ end;
+ {_, _} ->
+ throw({bad_request, <<"State field not `running` or `stopped`">>})
+ end;
+
+handle_shard_splits_req(#httpd{path_parts=[_, ?JOBS ,_, ?STATE ]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD,PUT").
% Private helper functions
@@ -175,23 +256,69 @@ get_job(JobId) ->
end.
-get_state() ->
+resume_job(JobId) ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, resume_job,
+ [JobId]),
+ WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
+ case lists:usort(WithoutNotFound) of
+ [ok] ->
+ ok;
+ [{error, Error} | _] ->
+ {error, {[{error, to_binary(Error)}]}};
+ [] ->
+ {error, not_found}
+ end.
+
+
+stop_job(JobId, Reason) ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, stop_job,
+ [JobId, Reason]),
+ WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
+ case lists:usort(WithoutNotFound) of
+ [ok] ->
+ ok;
+ [{error, Error} | _] ->
+ {error, {[{error, to_binary(Error)}]}};
+ [] ->
+ {error, not_found}
+ end.
+
+
+get_summary() ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, get_state, []),
- Acc = lists:foldl(fun({Res}, {RAcc, TAcc, States}) ->
- Running = couch_util:get_value(jobs_running, Res, 0),
- Total = couch_util:get_value(jobs_total, Res, 0),
- Node = couch_util:get_value(node, Res),
- State = couch_util:get_value(state, Res),
- States1 = orddict:append(State, Node, States),
- {Running + RAcc, Total + TAcc, States1}
- end, {0, 0, orddict:new()}, Replies),
- {Running, Total, States} = Acc,
- {[
- {jobs_running, Running},
- {job_total, Total},
- {states, {orddict:to_list(States)}}
- ]}.
+ Stats0 = #{running => 0, total => 0, completed => 0, failed => 0,
+ stopped => 0},
+ {Stats, States} = lists:foldl(fun({Res}, {Stats, States}) ->
+ Stats1 = maps:map(fun(Stat, OldVal) ->
+ OldVal + couch_util:get_value(Stat, Res, 0)
+ end, Stats),
+ NewStates = [couch_util:get_value(state, Res) | States],
+ {Stats1, NewStates}
+ end, {Stats0, []}, Replies),
+ State = case lists:member(?S_RUNNING, States) of
+ true -> running;
+ false -> stopped
+ end,
+ {[{state, State}] ++ lists:sort(maps:to_list(Stats))}.
+
+
+get_shard_splitting_state() ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_shard_split, get_state, []),
+ Running = lists:foldl(fun({Res}, R) ->
+ case couch_util:get_value(state, Res) of
+ ?S_RUNNING -> R + 1;
+ _ -> R
+ end
+ end, 0, Replies),
+ % If at least one node is "running", then overall state is "running"
+ case Running > 0 of
+ true -> running;
+ false -> stopped
+ end.
start_shard_splitting() ->