summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-02-04 19:13:57 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-15 16:42:43 -0500
commit0432c6032aa47c07b4b6b06b3fe13c11854e40d2 (patch)
treefa92381b27c22ea818a3cce3221efdc988edd2ba
parentce5e4dbf5cbad1b57fe5968390146f61f997aa64 (diff)
downloadcouchdb-0432c6032aa47c07b4b6b06b3fe13c11854e40d2.tar.gz
Implement job state history
(As discussed in the couchdb-dev mailing list) It returns a list of [{"state":"$statename", "ts":"$8601_timestamp"}] entries The entry combined both the job history as well as the split state / phase. Separating them might represent better how the code works, but it didn't look very clean from the API point of view.
-rw-r--r--src/mem3/src/mem3_reshard.erl53
-rw-r--r--src/mem3/src/mem3_reshard.hrl5
-rw-r--r--src/mem3/src/mem3_reshard_httpd_util.erl26
-rw-r--r--src/mem3/src/mem3_reshard_job.erl11
-rw-r--r--src/mem3/src/mem3_reshard_store.erl54
5 files changed, 99 insertions, 50 deletions
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
index c9cd29b9a..98b0fbe7e 100644
--- a/src/mem3/src/mem3_reshard.erl
+++ b/src/mem3/src/mem3_reshard.erl
@@ -30,6 +30,7 @@
reset_state/0,
db_monitor/1,
now_sec/0,
+ update_history/3,
start_link/0,
@@ -48,6 +49,7 @@
-define(JOB_ID_VERSION, 1).
-define(JOB_STATE_VERSION, 1).
-define(DEFAULT_MAX_JOBS, 25).
+-define(DEFAULT_MAX_HISTORY, 20).
-define(JOB_PREFIX, <<"reshard-job-">>).
-define(STATE_PREFIX, <<"reshard-state-">>).
@@ -69,11 +71,12 @@ start_split_job(#shard{} = Source, Split) ->
Targets = target_shards(Source, Split),
case mem3_reshard_validate:targets(Source, Targets) of
ok ->
+ TStamp = now_sec(),
Job = #job{
type = split,
job_state = new,
split_state = new,
- time_created = now_sec(),
+ state_history = [{new, TStamp}],
node = node(),
source = Source,
targets = Targets
@@ -106,7 +109,8 @@ remove_job(JobId) when is_binary(JobId) ->
-spec jobs() -> [[tuple()]].
jobs() ->
ets:foldl(fun(Job, Acc) ->
- Props = mem3_reshard_store:job_to_ejson_props(Job),
+ Opts = [iso8601],
+ Props = mem3_reshard_store:job_to_ejson_props(Job, Opts),
[{Props} | Acc]
end, [], ?MODULE).
@@ -115,7 +119,8 @@ jobs() ->
job(JobId) ->
case job_by_id(JobId) of
#job{} = Job ->
- Props = mem3_reshard_store:job_to_ejson_props(Job),
+ Opts = [iso8601],
+ Props = mem3_reshard_store:job_to_ejson_props(Job, Opts),
{ok, {Props}};
not_found ->
{error, not_found}
@@ -382,7 +387,7 @@ reload_job(#job{job_state = running} = Job, #state{state = stopped} = State) ->
pid = undefined,
ref = undefined
},
- true = ets:insert(?MODULE, Job1),
+ true = ets:insert(?MODULE, update_job_state_history(Job1)),
State;
% This is a case when a job process should be spawend
@@ -417,8 +422,9 @@ get_start_delay_sec() ->
start_job_int(Job, State) ->
case spawn_job(Job) of
{ok, #job{} = Job1} ->
- ok = mem3_reshard_store:store_job(State, Job1),
- true = ets:insert(?MODULE, Job1),
+ Job2 = update_job_state_history(Job1),
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
ok;
{error, Error} ->
{error, Error}
@@ -489,8 +495,9 @@ handle_job_exit(#job{split_state = completed} = Job, normal, State) ->
time_updated = now_sec(),
state_info = []
},
- ok = mem3_reshard_store:store_job(State, Job1),
- true = ets:insert(?MODULE, Job1),
+ Job2 = update_job_state_history(Job1),
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
ok;
handle_job_exit(#job{job_state = running} = Job, normal, _State) ->
@@ -503,7 +510,7 @@ handle_job_exit(#job{job_state = running} = Job, normal, _State) ->
time_updated = now_sec(),
state_info = info_update(reason, <<"Job stopped">>, OldInfo)
},
- true = ets:insert(?MODULE, Job1),
+ true = ets:insert(?MODULE, update_job_state_history(Job1)),
ok;
handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
@@ -516,7 +523,7 @@ handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
time_updated = now_sec(),
state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
},
- true = ets:insert(?MODULE, Job1),
+ true = ets:insert(?MODULE, update_job_state_history(Job1)),
ok;
handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) ->
@@ -529,7 +536,7 @@ handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) ->
time_updated = now_sec(),
state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
},
- true = ets:insert(?MODULE, Job1),
+ true = ets:insert(?MODULE, update_job_state_history(Job1)),
ok;
handle_job_exit(#job{} = Job, Error, State) ->
@@ -542,8 +549,9 @@ handle_job_exit(#job{} = Job, Error, State) ->
time_updated = now_sec(),
state_info = info_update(reason, Error, OldInfo)
},
- ok = mem3_reshard_store:store_job(State, Job1),
- true = ets:insert(?MODULE, Job1),
+ Job2 = update_job_state_history(Job1),
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
ok.
@@ -731,6 +739,24 @@ iso8601(UnixSec) ->
lists:flatten(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
+-spec update_job_state_history(#job{}) -> #job{}.
+update_job_state_history(#job{job_state = St, time_updated = Ts} = Job) ->
+ Hist = Job#job.state_history,
+ Job#job{state_history = update_history(St, Ts, Hist)}.
+
+
+-spec update_history(atom(), time_sec(), list()) -> list().
+update_history(State, Ts, [{State, _} | Rest]) ->
+ lists:sublist([{State, Ts} | Rest], max_history());
+
+update_history(State, Ts, History) ->
+ lists:sublist([{State, Ts} | History], max_history()).
+
+
+-spec max_history() -> non_neg_integer().
+max_history() ->
+ config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY).
+
-spec statefmt(#state{} | term()) -> string().
statefmt(#state{} = State) ->
@@ -751,7 +777,6 @@ statefmt(State) ->
lists:flatten(Fmt).
-
-spec jobfmt(#job{}) -> string().
jobfmt(#job{} = Job) ->
mem3_reshard_job:jobfmt(Job).
diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl
index bae6feffa..198780673 100644
--- a/src/mem3/src/mem3_reshard.hrl
+++ b/src/mem3/src/mem3_reshard.hrl
@@ -17,6 +17,7 @@
-type split() :: pos_integer(). % also power of 2
-type job_id() :: binary() | undefined.
-type job_type() :: split.
+-type time_sec() :: non_neg_integer().
-type shard_split_main_state() ::
running |
@@ -50,8 +51,8 @@
targets :: [#shard{}],
job_state :: job_state(),
split_state :: split_state(),
- state_info = []:: [{atom(), any()}],
- time_created :: non_neg_integer(),
+ state_info = [] :: [{atom(), any()}],
+ state_history = [] :: [{atom(), time_sec()}],
time_started = 0 :: non_neg_integer(),
time_updated = 0 :: non_neg_integer(),
node :: node(),
diff --git a/src/mem3/src/mem3_reshard_httpd_util.erl b/src/mem3/src/mem3_reshard_httpd_util.erl
index 8c99b1946..684135a96 100644
--- a/src/mem3/src/mem3_reshard_httpd_util.erl
+++ b/src/mem3/src/mem3_reshard_httpd_util.erl
@@ -156,7 +156,7 @@ pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) ->
get_jobs() ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []),
- [iso8160_timestamps(R) || R <- lists:flatten(Replies)].
+ lists:flatten(Replies).
get_job(JobId) ->
@@ -164,7 +164,7 @@ get_job(JobId) ->
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]),
case [JobInfo || {ok, JobInfo} <- Replies] of
[JobInfo | _] ->
- {ok, iso8160_timestamps(JobInfo)};
+ {ok, JobInfo};
[] ->
{error, not_found}
end.
@@ -275,25 +275,3 @@ stop_shard_splitting(Reason) ->
[Error | _] ->
{error, {[{error, couch_util:to_binary(Error)}]}}
end.
-
-
-iso8160_timestamps({Props}) ->
- NewProps = lists:map(fun
- ({time_created, UnixSec}) ->
- {time_created, unix_to_iso8601(UnixSec)};
- ({time_updated, UnixSec}) ->
- {time_updated, unix_to_iso8601(UnixSec)};
- ({time_started, UnixSec}) ->
- {time_started, unix_to_iso8601(UnixSec)};
- ({K, V}) ->
- {K, V}
- end, Props),
- {NewProps}.
-
-
-unix_to_iso8601(UnixSec) ->
- Mega = UnixSec div 1000000,
- Sec = UnixSec rem 1000000,
- {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),
- Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
- iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 6d2a438f7..bd4d89a6d 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -207,9 +207,10 @@ switch_state(#job{manager = ManagerPid} = Job0, NewState) ->
state_info = Info2,
workers = []
},
- ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job)),
+ Job1 = update_split_state_history(Job),
+ ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job1)),
gen_server:cast(self(), do_state),
- Job.
+ Job1.
-spec do_state(#job{}) -> #job{}.
@@ -501,3 +502,9 @@ reset_targets(#job{source = Source, targets = Targets} = Job) ->
end
end, Targets),
Job.
+
+
+-spec update_split_state_history(#job{}) -> #job{}.
+update_split_state_history(#job{split_state = St, time_updated = Ts} = Job) ->
+ Hist = Job#job.state_history,
+ Job#job{state_history = mem3_reshard:update_history(St, Ts, Hist)}.
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
index 52d50bc55..c6ea94582 100644
--- a/src/mem3/src/mem3_reshard_store.erl
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -25,7 +25,7 @@
load_state/1,
delete_state/1, % for debugging
- job_to_ejson_props/1,
+ job_to_ejson_props/2,
state_to_ejson_props/1
]).
@@ -173,7 +173,21 @@ load_doc(Db, DocId) ->
end.
-job_to_ejson_props(#job{source = Source, targets = Targets} = Job) ->
+job_to_ejson_props(#job{} = Job) ->
+ job_to_ejson_props(Job, []).
+
+
+job_to_ejson_props(#job{source = Source, targets = Targets} = Job, Opts) ->
+ Iso8601 = proplists:get_value(iso8601, Opts),
+ History = state_history_to_ejson(Job#job.state_history, Iso8601),
+ TimeStarted = case Iso8601 of
+ true -> unix_to_iso8601(Job#job.time_started);
+ _ -> Job#job.time_started
+ end,
+ TimeUpdated = case Iso8601 of
+ true -> unix_to_iso8601(Job#job.time_updated);
+ _ -> Job#job.time_updated
+ end,
[
{id, Job#job.id},
{type, Job#job.type},
@@ -183,9 +197,9 @@ job_to_ejson_props(#job{source = Source, targets = Targets} = Job) ->
{split_state, Job#job.split_state},
{state_info, state_info_to_ejson(Job#job.state_info)},
{node, atom_to_binary(Job#job.node, utf8)},
- {time_created, Job#job.time_created},
- {time_started, Job#job.time_started},
- {time_updated, Job#job.time_updated}
+ {time_started, TimeStarted},
+ {time_updated, TimeUpdated},
+ {state_history, History}
].
@@ -197,9 +211,9 @@ job_from_ejson({Props}) ->
JobState = couch_util:get_value(<<"job_state">>, Props),
SplitState = couch_util:get_value(<<"split_state">>, Props),
StateInfo = couch_util:get_value(<<"state_info">>, Props),
- TCreated = couch_util:get_value(<<"time_created">>, Props),
TStarted = couch_util:get_value(<<"time_started">>, Props),
TUpdated = couch_util:get_value(<<"time_updated">>, Props),
+ History = couch_util:get_value(<<"state_history">>, Props),
#job{
id = Id,
type = binary_to_atom(Type, utf8),
@@ -207,11 +221,11 @@ job_from_ejson({Props}) ->
split_state = binary_to_atom(SplitState, utf8),
state_info = state_info_from_ejson(StateInfo),
node = node(),
- time_created = TCreated,
time_started = TStarted,
time_updated = TUpdated,
source = mem3_reshard:shard_from_name(Source),
- targets = [mem3_reshard:shard_from_name(T) || T <- Targets]
+ targets = [mem3_reshard:shard_from_name(T) || T <- Targets],
+ state_history = state_history_from_ejson(History)
}.
@@ -242,9 +256,33 @@ state_info_from_ejson({Props}) ->
lists:sort(Props1).
+state_history_to_ejson(History, true) when is_list(History) ->
+ [{[{state, S}, {ts, unix_to_iso8601(T)}]} || {S, T} <- History];
+
+state_history_to_ejson(History, _) when is_list(History) ->
+ [{[{state, S}, {ts, T}]} || {S, T} <- History].
+
+
+
+state_history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
+ lists:map(fun({EventProps}) ->
+ State = couch_util:get_value(<<"state">>, EventProps),
+ Timestamp = couch_util:get_value(<<"ts">>, EventProps),
+ {binary_to_atom(State, utf8), Timestamp}
+ end, HistoryEJson).
+
+
state_info_to_ejson(Props) ->
{lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}.
store_state() ->
config:get_boolean("mem3_reshard", "store_state", true).
+
+
+unix_to_iso8601(UnixSec) ->
+ Mega = UnixSec div 1000000,
+ Sec = UnixSec rem 1000000,
+ {{Y, Mon, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),
+ Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
+ iolist_to_binary(io_lib:format(Format, [Y, Mon, D, H, Min, S])).