summaryrefslogtreecommitdiff
path: root/src/mem3/src/mem3_reshard_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/src/mem3_reshard_store.erl')
-rw-r--r--src/mem3/src/mem3_reshard_store.erl81
1 files changed, 32 insertions, 49 deletions
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
index c3534b374..140cc5bd7 100644
--- a/src/mem3/src/mem3_reshard_store.erl
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -12,7 +12,6 @@
-module(mem3_reshard_store).
-
-export([
init/3,
@@ -23,17 +22,16 @@
store_state/1,
load_state/2,
- delete_state/1, % for debugging
+ % for debugging
+ delete_state/1,
job_to_ejson_props/2,
state_to_ejson_props/1
]).
-
-include_lib("couch/include/couch_db.hrl").
-include("mem3_reshard.hrl").
-
-spec init(#state{}, binary(), binary()) -> #state{}.
init(#state{} = State, JobPrefix, StateDocId) ->
State#state{
@@ -41,7 +39,6 @@ init(#state{} = State, JobPrefix, StateDocId) ->
state_id = <<?LOCAL_DOC_PREFIX, StateDocId/binary>>
}.
-
-spec store_job(#state{}, #job{}) -> ok.
store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) ->
with_shards_db(fun(Db) ->
@@ -49,7 +46,6 @@ store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) ->
ok = update_doc(Db, DocId, job_to_ejson_props(Job))
end).
-
-spec load_job(#state{}, binary()) -> {ok, {[_]}} | not_found.
load_job(#state{job_prefix = Prefix}, Id) ->
with_shards_db(fun(Db) ->
@@ -61,7 +57,6 @@ load_job(#state{job_prefix = Prefix}, Id) ->
end
end).
-
-spec delete_job(#state{}, binary()) -> ok.
delete_job(#state{job_prefix = Prefix}, Id) ->
with_shards_db(fun(Db) ->
@@ -69,7 +64,6 @@ delete_job(#state{job_prefix = Prefix}, Id) ->
ok = delete_doc(Db, DocId)
end).
-
-spec get_jobs(#state{}) -> [#job{}].
get_jobs(#state{job_prefix = Prefix}) ->
with_shards_db(fun(Db) ->
@@ -87,14 +81,12 @@ get_jobs(#state{job_prefix = Prefix}) ->
lists:reverse(Jobs)
end).
-
-spec store_state(#state{}) -> ok.
store_state(#state{state_id = DocId} = State) ->
with_shards_db(fun(Db) ->
ok = update_doc(Db, DocId, state_to_ejson_props(State))
end).
-
-spec load_state(#state{}, atom()) -> #state{}.
load_state(#state{state_id = DocId} = State, Default) ->
with_shards_db(fun(Db) ->
@@ -106,25 +98,25 @@ load_state(#state{state_id = DocId} = State, Default) ->
end
end).
-
-spec delete_state(#state{}) -> ok.
delete_state(#state{state_id = DocId}) ->
with_shards_db(fun(Db) ->
ok = delete_doc(Db, DocId)
end).
-
job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) ->
Iso8601 = proplists:get_value(iso8601, Opts),
History = history_to_ejson(Job#job.history, Iso8601),
- StartTime = case Iso8601 of
- true -> iso8601(Job#job.start_time);
- _ -> Job#job.start_time
- end,
- UpdateTime = case Iso8601 of
- true -> iso8601(Job#job.update_time);
- _ -> Job#job.update_time
- end,
+ StartTime =
+ case Iso8601 of
+ true -> iso8601(Job#job.start_time);
+ _ -> Job#job.start_time
+ end,
+ UpdateTime =
+ case Iso8601 of
+ true -> iso8601(Job#job.update_time);
+ _ -> Job#job.update_time
+ end,
[
{id, Job#job.id},
{type, Job#job.type},
@@ -139,7 +131,6 @@ job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) ->
{history, History}
].
-
state_to_ejson_props(#state{} = State) ->
[
{state, atom_to_binary(State#state.state, utf8)},
@@ -148,7 +139,6 @@ state_to_ejson_props(#state{} = State) ->
{node, atom_to_binary(State#state.node, utf8)}
].
-
% Private API
with_shards_db(Fun) ->
@@ -164,7 +154,6 @@ with_shards_db(Fun) ->
throw(Else)
end.
-
delete_doc(Db, DocId) ->
case couch_db:open_doc(Db, DocId, []) of
{ok, #doc{revs = {_, Revs}}} ->
@@ -174,17 +163,17 @@ delete_doc(Db, DocId) ->
ok
end.
-
update_doc(Db, DocId, Body) ->
DocProps = [{<<"_id">>, DocId}] ++ Body,
Body1 = ?JSON_DECODE(?JSON_ENCODE({DocProps})),
BaseDoc = couch_doc:from_json_obj(Body1),
- Doc = case couch_db:open_doc(Db, DocId, []) of
- {ok, #doc{revs = Revs}} ->
- BaseDoc#doc{revs = Revs};
- {not_found, _} ->
- BaseDoc
- end,
+ Doc =
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = Revs}} ->
+ BaseDoc#doc{revs = Revs};
+ {not_found, _} ->
+ BaseDoc
+ end,
case store_state() of
true ->
{ok, _} = couch_db:update_doc(Db, Doc, []),
@@ -195,7 +184,6 @@ update_doc(Db, DocId, Body) ->
ok
end.
-
load_doc(Db, DocId) ->
case couch_db:open_doc(Db, DocId, [ejson_body]) of
{ok, #doc{body = Body}} ->
@@ -205,11 +193,9 @@ load_doc(Db, DocId) ->
not_found
end.
-
job_to_ejson_props(#job{} = Job) ->
job_to_ejson_props(Job, []).
-
job_from_ejson({Props}) ->
Id = couch_util:get_value(<<"id">>, Props),
Type = couch_util:get_value(<<"type">>, Props),
@@ -235,7 +221,6 @@ job_from_ejson({Props}) ->
history = history_from_ejson(History)
}.
-
state_from_ejson(#state{} = State, {Props}) ->
StateVal = couch_util:get_value(<<"state">>, Props),
StateInfo = couch_util:get_value(<<"state_info">>, Props),
@@ -247,37 +232,35 @@ state_from_ejson(#state{} = State, {Props}) ->
update_time = TUpdated
}.
-
state_info_from_ejson({Props}) ->
- Props1 = [{binary_to_atom(K, utf8), couch_util:to_binary(V)}
- || {K, V} <- Props],
+ Props1 = [
+ {binary_to_atom(K, utf8), couch_util:to_binary(V)}
+ || {K, V} <- Props
+ ],
lists:sort(Props1).
-
history_to_ejson(Hist, true) when is_list(Hist) ->
[{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {T, S, D} <- Hist];
-
history_to_ejson(Hist, _) when is_list(Hist) ->
[{[{timestamp, T}, {type, S}, {detail, D}]} || {T, S, D} <- Hist].
-
history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
- lists:map(fun({EventProps}) ->
- Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
- State = couch_util:get_value(<<"type">>, EventProps),
- Detail = couch_util:get_value(<<"detail">>, EventProps),
- {Timestamp, binary_to_atom(State, utf8), Detail}
- end, HistoryEJson).
-
+ lists:map(
+ fun({EventProps}) ->
+ Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
+ State = couch_util:get_value(<<"type">>, EventProps),
+ Detail = couch_util:get_value(<<"detail">>, EventProps),
+ {Timestamp, binary_to_atom(State, utf8), Detail}
+ end,
+ HistoryEJson
+ ).
state_info_to_ejson(Props) ->
{lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}.
-
store_state() ->
config:get_boolean("reshard", "store_state", true).
-
iso8601(UnixSec) ->
Mega = UnixSec div 1000000,
Sec = UnixSec rem 1000000,