diff options
Diffstat (limited to 'src/mem3/src/mem3_reshard_store.erl')
-rw-r--r-- | src/mem3/src/mem3_reshard_store.erl | 81 |
1 files changed, 32 insertions, 49 deletions
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl index c3534b374..140cc5bd7 100644 --- a/src/mem3/src/mem3_reshard_store.erl +++ b/src/mem3/src/mem3_reshard_store.erl @@ -12,7 +12,6 @@ -module(mem3_reshard_store). - -export([ init/3, @@ -23,17 +22,16 @@ store_state/1, load_state/2, - delete_state/1, % for debugging + % for debugging + delete_state/1, job_to_ejson_props/2, state_to_ejson_props/1 ]). - -include_lib("couch/include/couch_db.hrl"). -include("mem3_reshard.hrl"). - -spec init(#state{}, binary(), binary()) -> #state{}. init(#state{} = State, JobPrefix, StateDocId) -> State#state{ @@ -41,7 +39,6 @@ init(#state{} = State, JobPrefix, StateDocId) -> state_id = <<?LOCAL_DOC_PREFIX, StateDocId/binary>> }. - -spec store_job(#state{}, #job{}) -> ok. store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) -> with_shards_db(fun(Db) -> @@ -49,7 +46,6 @@ store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) -> ok = update_doc(Db, DocId, job_to_ejson_props(Job)) end). - -spec load_job(#state{}, binary()) -> {ok, {[_]}} | not_found. load_job(#state{job_prefix = Prefix}, Id) -> with_shards_db(fun(Db) -> @@ -61,7 +57,6 @@ load_job(#state{job_prefix = Prefix}, Id) -> end end). - -spec delete_job(#state{}, binary()) -> ok. delete_job(#state{job_prefix = Prefix}, Id) -> with_shards_db(fun(Db) -> @@ -69,7 +64,6 @@ delete_job(#state{job_prefix = Prefix}, Id) -> ok = delete_doc(Db, DocId) end). - -spec get_jobs(#state{}) -> [#job{}]. get_jobs(#state{job_prefix = Prefix}) -> with_shards_db(fun(Db) -> @@ -87,14 +81,12 @@ get_jobs(#state{job_prefix = Prefix}) -> lists:reverse(Jobs) end). - -spec store_state(#state{}) -> ok. store_state(#state{state_id = DocId} = State) -> with_shards_db(fun(Db) -> ok = update_doc(Db, DocId, state_to_ejson_props(State)) end). - -spec load_state(#state{}, atom()) -> #state{}. load_state(#state{state_id = DocId} = State, Default) -> with_shards_db(fun(Db) -> @@ -106,25 +98,25 @@ load_state(#state{state_id = DocId} = State, Default) -> end end). - -spec delete_state(#state{}) -> ok. delete_state(#state{state_id = DocId}) -> with_shards_db(fun(Db) -> ok = delete_doc(Db, DocId) end). - job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) -> Iso8601 = proplists:get_value(iso8601, Opts), History = history_to_ejson(Job#job.history, Iso8601), - StartTime = case Iso8601 of - true -> iso8601(Job#job.start_time); - _ -> Job#job.start_time - end, - UpdateTime = case Iso8601 of - true -> iso8601(Job#job.update_time); - _ -> Job#job.update_time - end, + StartTime = + case Iso8601 of + true -> iso8601(Job#job.start_time); + _ -> Job#job.start_time + end, + UpdateTime = + case Iso8601 of + true -> iso8601(Job#job.update_time); + _ -> Job#job.update_time + end, [ {id, Job#job.id}, {type, Job#job.type}, @@ -139,7 +131,6 @@ job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) -> {history, History} ]. - state_to_ejson_props(#state{} = State) -> [ {state, atom_to_binary(State#state.state, utf8)}, @@ -148,7 +139,6 @@ state_to_ejson_props(#state{} = State) -> {node, atom_to_binary(State#state.node, utf8)} ]. - % Private API with_shards_db(Fun) -> @@ -164,7 +154,6 @@ with_shards_db(Fun) -> throw(Else) end. - delete_doc(Db, DocId) -> case couch_db:open_doc(Db, DocId, []) of {ok, #doc{revs = {_, Revs}}} -> @@ -174,17 +163,17 @@ delete_doc(Db, DocId) -> ok end. - update_doc(Db, DocId, Body) -> DocProps = [{<<"_id">>, DocId}] ++ Body, Body1 = ?JSON_DECODE(?JSON_ENCODE({DocProps})), BaseDoc = couch_doc:from_json_obj(Body1), - Doc = case couch_db:open_doc(Db, DocId, []) of - {ok, #doc{revs = Revs}} -> - BaseDoc#doc{revs = Revs}; - {not_found, _} -> - BaseDoc - end, + Doc = + case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = Revs}} -> + BaseDoc#doc{revs = Revs}; + {not_found, _} -> + BaseDoc + end, case store_state() of true -> {ok, _} = couch_db:update_doc(Db, Doc, []), @@ -195,7 +184,6 @@ update_doc(Db, DocId, Body) -> ok end. - load_doc(Db, DocId) -> case couch_db:open_doc(Db, DocId, [ejson_body]) of {ok, #doc{body = Body}} -> @@ -205,11 +193,9 @@ load_doc(Db, DocId) -> not_found end. - job_to_ejson_props(#job{} = Job) -> job_to_ejson_props(Job, []). - job_from_ejson({Props}) -> Id = couch_util:get_value(<<"id">>, Props), Type = couch_util:get_value(<<"type">>, Props), @@ -235,7 +221,6 @@ job_from_ejson({Props}) -> history = history_from_ejson(History) }. - state_from_ejson(#state{} = State, {Props}) -> StateVal = couch_util:get_value(<<"state">>, Props), StateInfo = couch_util:get_value(<<"state_info">>, Props), @@ -247,37 +232,35 @@ state_from_ejson(#state{} = State, {Props}) -> update_time = TUpdated }. - state_info_from_ejson({Props}) -> - Props1 = [{binary_to_atom(K, utf8), couch_util:to_binary(V)} - || {K, V} <- Props], + Props1 = [ + {binary_to_atom(K, utf8), couch_util:to_binary(V)} + || {K, V} <- Props + ], lists:sort(Props1). - history_to_ejson(Hist, true) when is_list(Hist) -> [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {T, S, D} <- Hist]; - history_to_ejson(Hist, _) when is_list(Hist) -> [{[{timestamp, T}, {type, S}, {detail, D}]} || {T, S, D} <- Hist]. - history_from_ejson(HistoryEJson) when is_list(HistoryEJson) -> - lists:map(fun({EventProps}) -> - Timestamp = couch_util:get_value(<<"timestamp">>, EventProps), - State = couch_util:get_value(<<"type">>, EventProps), - Detail = couch_util:get_value(<<"detail">>, EventProps), - {Timestamp, binary_to_atom(State, utf8), Detail} - end, HistoryEJson). - + lists:map( + fun({EventProps}) -> + Timestamp = couch_util:get_value(<<"timestamp">>, EventProps), + State = couch_util:get_value(<<"type">>, EventProps), + Detail = couch_util:get_value(<<"detail">>, EventProps), + {Timestamp, binary_to_atom(State, utf8), Detail} + end, + HistoryEJson + ). state_info_to_ejson(Props) -> {lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}. - store_state() -> config:get_boolean("reshard", "store_state", true). - iso8601(UnixSec) -> Mega = UnixSec div 1000000, Sec = UnixSec rem 1000000, |