diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-04 19:13:57 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2019-02-15 16:42:43 -0500 |
commit | 0432c6032aa47c07b4b6b06b3fe13c11854e40d2 (patch) | |
tree | fa92381b27c22ea818a3cce3221efdc988edd2ba | |
parent | ce5e4dbf5cbad1b57fe5968390146f61f997aa64 (diff) | |
download | couchdb-0432c6032aa47c07b4b6b06b3fe13c11854e40d2.tar.gz |
Implement job state history
(As discussed in the couchdb-dev mailing list)
It returns a list of [{"state":"$statename", "ts":"$8601_timestamp"}] entries
The entry combined both the job history as well as the split state / phase.
Separating them might represent better how the code works, but it didn't look
very clean from the API point of view.
-rw-r--r-- | src/mem3/src/mem3_reshard.erl | 53 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard.hrl | 5 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_httpd_util.erl | 26 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_job.erl | 11 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_store.erl | 54 |
5 files changed, 99 insertions, 50 deletions
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl index c9cd29b9a..98b0fbe7e 100644 --- a/src/mem3/src/mem3_reshard.erl +++ b/src/mem3/src/mem3_reshard.erl @@ -30,6 +30,7 @@ reset_state/0, db_monitor/1, now_sec/0, + update_history/3, start_link/0, @@ -48,6 +49,7 @@ -define(JOB_ID_VERSION, 1). -define(JOB_STATE_VERSION, 1). -define(DEFAULT_MAX_JOBS, 25). +-define(DEFAULT_MAX_HISTORY, 20). -define(JOB_PREFIX, <<"reshard-job-">>). -define(STATE_PREFIX, <<"reshard-state-">>). @@ -69,11 +71,12 @@ start_split_job(#shard{} = Source, Split) -> Targets = target_shards(Source, Split), case mem3_reshard_validate:targets(Source, Targets) of ok -> + TStamp = now_sec(), Job = #job{ type = split, job_state = new, split_state = new, - time_created = now_sec(), + state_history = [{new, TStamp}], node = node(), source = Source, targets = Targets @@ -106,7 +109,8 @@ remove_job(JobId) when is_binary(JobId) -> -spec jobs() -> [[tuple()]]. jobs() -> ets:foldl(fun(Job, Acc) -> - Props = mem3_reshard_store:job_to_ejson_props(Job), + Opts = [iso8601], + Props = mem3_reshard_store:job_to_ejson_props(Job, Opts), [{Props} | Acc] end, [], ?MODULE). @@ -115,7 +119,8 @@ jobs() -> job(JobId) -> case job_by_id(JobId) of #job{} = Job -> - Props = mem3_reshard_store:job_to_ejson_props(Job), + Opts = [iso8601], + Props = mem3_reshard_store:job_to_ejson_props(Job, Opts), {ok, {Props}}; not_found -> {error, not_found} @@ -382,7 +387,7 @@ reload_job(#job{job_state = running} = Job, #state{state = stopped} = State) -> pid = undefined, ref = undefined }, - true = ets:insert(?MODULE, Job1), + true = ets:insert(?MODULE, update_job_state_history(Job1)), State; % This is a case when a job process should be spawend @@ -417,8 +422,9 @@ get_start_delay_sec() -> start_job_int(Job, State) -> case spawn_job(Job) of {ok, #job{} = Job1} -> - ok = mem3_reshard_store:store_job(State, Job1), - true = ets:insert(?MODULE, Job1), + Job2 = update_job_state_history(Job1), + ok = mem3_reshard_store:store_job(State, Job2), + true = ets:insert(?MODULE, Job2), ok; {error, Error} -> {error, Error} @@ -489,8 +495,9 @@ handle_job_exit(#job{split_state = completed} = Job, normal, State) -> time_updated = now_sec(), state_info = [] }, - ok = mem3_reshard_store:store_job(State, Job1), - true = ets:insert(?MODULE, Job1), + Job2 = update_job_state_history(Job1), + ok = mem3_reshard_store:store_job(State, Job2), + true = ets:insert(?MODULE, Job2), ok; handle_job_exit(#job{job_state = running} = Job, normal, _State) -> @@ -503,7 +510,7 @@ handle_job_exit(#job{job_state = running} = Job, normal, _State) -> time_updated = now_sec(), state_info = info_update(reason, <<"Job stopped">>, OldInfo) }, - true = ets:insert(?MODULE, Job1), + true = ets:insert(?MODULE, update_job_state_history(Job1)), ok; handle_job_exit(#job{job_state = running} = Job, shutdown, _State) -> @@ -516,7 +523,7 @@ handle_job_exit(#job{job_state = running} = Job, shutdown, _State) -> time_updated = now_sec(), state_info = info_update(reason, <<"Job shutdown">>, OldInfo) }, - true = ets:insert(?MODULE, Job1), + true = ets:insert(?MODULE, update_job_state_history(Job1)), ok; handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) -> @@ -529,7 +536,7 @@ handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) -> time_updated = now_sec(), state_info = info_update(reason, <<"Job shutdown">>, OldInfo) }, - true = ets:insert(?MODULE, Job1), + true = ets:insert(?MODULE, update_job_state_history(Job1)), ok; handle_job_exit(#job{} = Job, Error, State) -> @@ -542,8 +549,9 @@ handle_job_exit(#job{} = Job, Error, State) -> time_updated = now_sec(), state_info = info_update(reason, Error, OldInfo) }, - ok = mem3_reshard_store:store_job(State, Job1), - true = ets:insert(?MODULE, Job1), + Job2 = update_job_state_history(Job1), + ok = mem3_reshard_store:store_job(State, Job2), + true = ets:insert(?MODULE, Job2), ok. @@ -731,6 +739,24 @@ iso8601(UnixSec) -> lists:flatten(io_lib:format(Format, [Y, Mon, D, H, Min, S])). +-spec update_job_state_history(#job{}) -> #job{}. +update_job_state_history(#job{job_state = St, time_updated = Ts} = Job) -> + Hist = Job#job.state_history, + Job#job{state_history = update_history(St, Ts, Hist)}. + + +-spec update_history(atom(), time_sec(), list()) -> list(). +update_history(State, Ts, [{State, _} | Rest]) -> + lists:sublist([{State, Ts} | Rest], max_history()); + +update_history(State, Ts, History) -> + lists:sublist([{State, Ts} | History], max_history()). + + +-spec max_history() -> non_neg_integer(). +max_history() -> + config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY). + -spec statefmt(#state{} | term()) -> string(). statefmt(#state{} = State) -> @@ -751,7 +777,6 @@ statefmt(State) -> lists:flatten(Fmt). - -spec jobfmt(#job{}) -> string(). jobfmt(#job{} = Job) -> mem3_reshard_job:jobfmt(Job). diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl index bae6feffa..198780673 100644 --- a/src/mem3/src/mem3_reshard.hrl +++ b/src/mem3/src/mem3_reshard.hrl @@ -17,6 +17,7 @@ -type split() :: pos_integer(). % also power of 2 -type job_id() :: binary() | undefined. -type job_type() :: split. +-type time_sec() :: non_neg_integer(). -type shard_split_main_state() :: running | @@ -50,8 +51,8 @@ targets :: [#shard{}], job_state :: job_state(), split_state :: split_state(), - state_info = []:: [{atom(), any()}], - time_created :: non_neg_integer(), + state_info = [] :: [{atom(), any()}], + state_history = [] :: [{atom(), time_sec()}], time_started = 0 :: non_neg_integer(), time_updated = 0 :: non_neg_integer(), node :: node(), diff --git a/src/mem3/src/mem3_reshard_httpd_util.erl b/src/mem3/src/mem3_reshard_httpd_util.erl index 8c99b1946..684135a96 100644 --- a/src/mem3/src/mem3_reshard_httpd_util.erl +++ b/src/mem3/src/mem3_reshard_httpd_util.erl @@ -156,7 +156,7 @@ pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) -> get_jobs() -> Nodes = mem3_util:live_nodes(), {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []), - [iso8160_timestamps(R) || R <- lists:flatten(Replies)]. + lists:flatten(Replies). get_job(JobId) -> @@ -164,7 +164,7 @@ get_job(JobId) -> {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]), case [JobInfo || {ok, JobInfo} <- Replies] of [JobInfo | _] -> - {ok, iso8160_timestamps(JobInfo)}; + {ok, JobInfo}; [] -> {error, not_found} end. @@ -275,25 +275,3 @@ stop_shard_splitting(Reason) -> [Error | _] -> {error, {[{error, couch_util:to_binary(Error)}]}} end. - - -iso8160_timestamps({Props}) -> - NewProps = lists:map(fun - ({time_created, UnixSec}) -> - {time_created, unix_to_iso8601(UnixSec)}; - ({time_updated, UnixSec}) -> - {time_updated, unix_to_iso8601(UnixSec)}; - ({time_started, UnixSec}) -> - {time_started, unix_to_iso8601(UnixSec)}; - ({K, V}) -> - {K, V} - end, Props), - {NewProps}. - - -unix_to_iso8601(UnixSec) -> - Mega = UnixSec div 1000000, - Sec = UnixSec rem 1000000, - {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}), - Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", - iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])). diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl index 6d2a438f7..bd4d89a6d 100644 --- a/src/mem3/src/mem3_reshard_job.erl +++ b/src/mem3/src/mem3_reshard_job.erl @@ -207,9 +207,10 @@ switch_state(#job{manager = ManagerPid} = Job0, NewState) -> state_info = Info2, workers = [] }, - ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job)), + Job1 = update_split_state_history(Job), + ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job1)), gen_server:cast(self(), do_state), - Job. + Job1. -spec do_state(#job{}) -> #job{}. @@ -501,3 +502,9 @@ reset_targets(#job{source = Source, targets = Targets} = Job) -> end end, Targets), Job. + + +-spec update_split_state_history(#job{}) -> #job{}. +update_split_state_history(#job{split_state = St, time_updated = Ts} = Job) -> + Hist = Job#job.state_history, + Job#job{state_history = mem3_reshard:update_history(St, Ts, Hist)}. diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl index 52d50bc55..c6ea94582 100644 --- a/src/mem3/src/mem3_reshard_store.erl +++ b/src/mem3/src/mem3_reshard_store.erl @@ -25,7 +25,7 @@ load_state/1, delete_state/1, % for debugging - job_to_ejson_props/1, + job_to_ejson_props/2, state_to_ejson_props/1 ]). @@ -173,7 +173,21 @@ load_doc(Db, DocId) -> end. -job_to_ejson_props(#job{source = Source, targets = Targets} = Job) -> +job_to_ejson_props(#job{} = Job) -> + job_to_ejson_props(Job, []). + + +job_to_ejson_props(#job{source = Source, targets = Targets} = Job, Opts) -> + Iso8601 = proplists:get_value(iso8601, Opts), + History = state_history_to_ejson(Job#job.state_history, Iso8601), + TimeStarted = case Iso8601 of + true -> unix_to_iso8601(Job#job.time_started); + _ -> Job#job.time_started + end, + TimeUpdated = case Iso8601 of + true -> unix_to_iso8601(Job#job.time_updated); + _ -> Job#job.time_updated + end, [ {id, Job#job.id}, {type, Job#job.type}, @@ -183,9 +197,9 @@ job_to_ejson_props(#job{source = Source, targets = Targets} = Job) -> {split_state, Job#job.split_state}, {state_info, state_info_to_ejson(Job#job.state_info)}, {node, atom_to_binary(Job#job.node, utf8)}, - {time_created, Job#job.time_created}, - {time_started, Job#job.time_started}, - {time_updated, Job#job.time_updated} + {time_started, TimeStarted}, + {time_updated, TimeUpdated}, + {state_history, History} ]. @@ -197,9 +211,9 @@ job_from_ejson({Props}) -> JobState = couch_util:get_value(<<"job_state">>, Props), SplitState = couch_util:get_value(<<"split_state">>, Props), StateInfo = couch_util:get_value(<<"state_info">>, Props), - TCreated = couch_util:get_value(<<"time_created">>, Props), TStarted = couch_util:get_value(<<"time_started">>, Props), TUpdated = couch_util:get_value(<<"time_updated">>, Props), + History = couch_util:get_value(<<"state_history">>, Props), #job{ id = Id, type = binary_to_atom(Type, utf8), @@ -207,11 +221,11 @@ job_from_ejson({Props}) -> split_state = binary_to_atom(SplitState, utf8), state_info = state_info_from_ejson(StateInfo), node = node(), - time_created = TCreated, time_started = TStarted, time_updated = TUpdated, source = mem3_reshard:shard_from_name(Source), - targets = [mem3_reshard:shard_from_name(T) || T <- Targets] + targets = [mem3_reshard:shard_from_name(T) || T <- Targets], + state_history = state_history_from_ejson(History) }. @@ -242,9 +256,33 @@ state_info_from_ejson({Props}) -> lists:sort(Props1). +state_history_to_ejson(History, true) when is_list(History) -> + [{[{state, S}, {ts, unix_to_iso8601(T)}]} || {S, T} <- History]; + +state_history_to_ejson(History, _) when is_list(History) -> + [{[{state, S}, {ts, T}]} || {S, T} <- History]. + + + +state_history_from_ejson(HistoryEJson) when is_list(HistoryEJson) -> + lists:map(fun({EventProps}) -> + State = couch_util:get_value(<<"state">>, EventProps), + Timestamp = couch_util:get_value(<<"ts">>, EventProps), + {binary_to_atom(State, utf8), Timestamp} + end, HistoryEJson). + + state_info_to_ejson(Props) -> {lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}. store_state() -> config:get_boolean("mem3_reshard", "store_state", true). + + +unix_to_iso8601(UnixSec) -> + Mega = UnixSec div 1000000, + Sec = UnixSec rem 1000000, + {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}), + Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ", + iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])). |