diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-04-12 13:24:34 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-04-12 13:55:09 -0500 |
commit | 20c1846fe0db79fee53cf554497b1235356eef9b (patch) | |
tree | 9fd06302b1b14667566da09fbb6c4ba970f1de75 | |
parent | db3b066e290a8e20b78311dcbf2f5a4b5018c8e0 (diff) | |
download | couchdb-20c1846fe0db79fee53cf554497b1235356eef9b.tar.gz |
First pass at handling COMMIT_UNKNOWN_RESULT
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 48 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 117 | ||||
-rw-r--r-- | src/fabric/src/fabric2_txid_cleaner.erl | 131 | ||||
-rw-r--r-- | src/fabric/src/fabric2_util.erl | 22 |
4 files changed, 249 insertions, 69 deletions
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 8168f419c..52ddc816c 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -162,16 +162,7 @@ create(DbName, Options) -> open(DbName, Options) -> case fabric2_server:fetch(DbName) of #{} = Db -> - with_tx(Db, fun(TxDb) -> - case fabric2_fdb:is_current(TxDb) of - true -> - {ok, maybe_set_user_ctx(Db, Options)}; - false -> - Reopened = fabric2_fdb:open(TxDb, Options), - ok = fabric2_server:store(Reopened), - {ok, Reopened} - end - end); + {ok, maybe_set_user_ctx(Db, Options)}; undefined -> transactional(DbName, Options, fun(TxDb) -> Opened = fabric2_fdb:open(TxDb, Options), @@ -184,7 +175,7 @@ open(DbName, Options) -> delete(DbName, Options) -> % This will throw if the db does not exist {ok, Db} = open(DbName, Options), - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:delete(TxDb) end). @@ -252,7 +243,7 @@ get_compactor_pid(#{} = _Db) -> get_db_info(#{} = Db) -> - DbProps = with_tx(Db, fun(TxDb) -> + DbProps = fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:get_info(TxDb) end), @@ -290,7 +281,7 @@ get_doc_count(DbName, <<"_local">>) -> get_doc_count(DbName, <<"doc_local_count">>); get_doc_count(Db, Key) -> - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:get_stat(TxDb, Key) end). @@ -361,14 +352,14 @@ is_system_db_name(DbName) when is_binary(DbName) -> set_revs_limit(#{} = Db, RevsLimit) -> RevsLimBin = ?uint2bin(RevsLimit), - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:set_config(TxDb, <<"revs_limit">>, RevsLimBin) end). set_security(#{} = Db, Security) -> SecBin = ?JSON_ENCODE(Security), - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:set_config(TxDb, <<"security_doc">>, SecBin) end). @@ -390,7 +381,7 @@ open_doc(#{} = Db, DocId) -> open_doc(#{} = Db, DocId, _Options) -> - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> case fabric2_fdb:get_winning_revs(TxDb, DocId, 1) of [] -> {not_found, missing}; @@ -405,7 +396,7 @@ open_doc(#{} = Db, DocId, _Options) -> open_doc_revs(Db, DocId, Revs, Options) -> Latest = lists:member(latest, Options), - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> #full_doc_info{ rev_tree = RevTree } = fabric2_db:get_full_doc_info(TxDb, DocId), @@ -439,7 +430,7 @@ update_doc(Db, Doc) -> update_doc(Db, Doc, Options) -> - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> update_doc_int(TxDb, Doc, Options) end). @@ -450,7 +441,7 @@ update_docs(Db, Docs) -> update_docs(Db, Docs, Options) -> {Resps, Status} = lists:mapfoldl(fun(Doc, Acc) -> - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> case update_doc_int(TxDb, Doc, Options) of {ok, _} = Resp -> {Resp, Acc}; @@ -467,7 +458,7 @@ fold_docs(Db, UserFun, UserAcc) -> fold_docs(Db, UserFun, UserAcc, Options) -> - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options) end). @@ -477,7 +468,7 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) -> fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> - with_tx(Db, fun(TxDb) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options) end). @@ -893,21 +884,6 @@ find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) -> RevInfo. -transactional(DbName, Options, Fun) -> - fabric2_util:transactional(fun(Tx) -> - Fun(fabric2_fdb:init(Tx, DbName, Options)) - end). - - -with_tx(#{tx := undefined} = Db, Fun) -> - fabric2_util:transactional(fun(Tx) -> - Fun(Db#{tx => Tx}) - end); - -with_tx(#{tx := {erlfdb_transaction, _}} = Db, Fun) -> - Fun(Db). - - doc_to_revid(#doc{revs = Revs}) -> case Revs of {0, []} -> {0, <<>>}; diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 3416b874c..549e94feb 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -14,7 +14,9 @@ -export([ - init/3, + transactional/1, + transactional/3, + transactional/2, create/2, open/2, @@ -67,6 +69,7 @@ -define(CLUSTER_CONFIG, 0). -define(ALL_DBS, 1). -define(DBS, 15). +-define(TX_IDS, 255). -define(DB_CONFIG, 16). -define(DB_STATS, 17). @@ -82,6 +85,15 @@ -define(CURR_REV_FORMAT, 0). +% Misc constants + +-define(PDICT_DB_KEY, '$fabric_db_handle'). +-define(PDICT_LAYER_CACHE, '$fabric_layer_id'). +-define(PDICT_TX_ID_KEY, '$fabric_tx_id'). +-define(PDICT_TX_RES_KEY, '$fabric_tx_result'). +-define(COMMIT_UNKOWN_RESULT, 1021). + + % Various utility macros -define(REQUIRE_TX(Db), {erlfdb_transaction, _} = maps:get(tx, Db)). @@ -90,16 +102,30 @@ -define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}). -init(Tx, DbName, Options) -> - Root = erlfdb_directory:root(), - CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), - Prefix = erlfdb_directory:get_name(CouchDB), - #{ - name => DbName, - tx => Tx, - layer_prefix => Prefix, - options => Options - }. +transactional(Fun) when is_function(Fun, 1) -> + Db = get_db_handle(), + try + erlfdb:transactional(Db, fun(Tx) -> + tx_wrap(Tx, Fun) + end) + after + tx_clear() + end. + + +transactional(DbName, Options, Fun) when is_binary(DbName) -> + transactional(fun(Tx) -> + Fun(init_db(Tx, DbName, Options)) + end). + + +transactional(#{tx := undefined} = Db, Fun) -> + transactional(fun(Tx) -> + Fun(Db#{tx => Tx}) + end); + +transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) -> + Fun(Db). create(#{} = Db, Options) -> @@ -615,6 +641,18 @@ get_changes(#{} = Db, Options) -> end, erlfdb:wait(Future)). +init(Tx, DbName, Options) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + #{ + name => DbName, + tx => Tx, + layer_prefix => Prefix, + options => Options + }. + + bump_metadata_version(Tx) -> % The 14 zero bytes is pulled from the PR for adding the % metadata version key. Not sure why 14 bytes when version @@ -702,3 +740,60 @@ fdb_to_doc(_Db, DocId, Pos, Path, Bin) when is_binary(Bin) -> }; fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) -> {not_found, missing}. + + +get_db_handle() -> + case get(?PDICT_DB_KEY) of + undefined -> + {ok, Db} = application:get_env(fabric, db), + put(?PDICT_DB_KEY, Db), + Db; + Db -> + Db + end. + + +tx_wrap(Tx, Fun) -> + case get(erlfdb_error) of + ?COMMIT_UNKNOWN_RESULT -> + tx_check_applied(Tx, Fun); + _ -> + tx_attempt(Tx, Fun) + end. + + +tx_clear() -> + fabric2_txid_cleaner:remove(get(?PDICT_TX_ID_KEY)), + put(?PDICT_TX_ID_KEY, undefined), + put(?PDICT_TX_RES_KEY, undefined). + + +tx_check_applied(Tx, Fun) -> + case get(?PDICT_TX_ID_KEY) of + undefined -> + tx_attempt(Tx, Fun); + TxId when is_binary(TxId) -> + case erlfdb:wait(erlfdb:get(Tx, TxId)) of + <<>> -> + get(?PDICT_TX_RES_KEY); + not_found -> + tx_attempt(Tx, Fun) + end + end. + + +tx_attempt(Tx, Fun) -> + Result = Fun(Tx), + TxId = tx_id(Tx), + ok = erlfdb:set(Tx, TxId, <<>>), + put(?PDICT_TX_RES_KEY, Result), + Result. + + +tx_id(Tx) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + {Mega, Secs, Micro} = os:timestamp(), + Key = {?TX_IDS, Mega, Secs, Micro, fabric2_util:uuid()}, + erlfdb_tuple:pack(Key, Prefix). diff --git a/src/fabric/src/fabric2_txid_cleaner.erl b/src/fabric/src/fabric2_txid_cleaner.erl new file mode 100644 index 000000000..eaefbcc99 --- /dev/null +++ b/src/fabric/src/fabric2_txid_cleaner.erl @@ -0,0 +1,131 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_txid_cleaner). +-behaviour(gen_server). +-vsn(1). + + +-export([ + start_link/0, + remove/1 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include("fabric2.hrl"). + + +-define(ONE_HOUR, 3600000000). +-define(MAX_TX_IDS, 1000). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +remove(TxId) when is_binary(TxId) -> + gen_server:cast(?MODULE, {remove, TxId}); + +remove(undefined) -> + ok. + + + +init(_) -> + {ok, #{ + last_sweep => os:timestamp(), + txids => [] + }}. + + +terminate(_, #{txids := TxIds}) -> + if TxIds == [] -> ok; true -> + fabric2_fdb:transactional(fun(Tx) -> + lists:foreach(fun(TxId) -> + erlfdb:clear(Tx, TxId) + end) + end) + end, + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast({remove, TxId}, St) -> + #{ + last_sweep := LastSweep, + txidsd := TxIds + } = St, + + NewTxIds = [TxId | TxIds], + NewSt = St#{txids := NewTxIds}, + + NeedsSweep = os:timer_diff(os:timestamp(), LastSweep) > ?ONE_HOUR, + + case NeedsSweep orelse length(NewTxIds) >= ?MAX_TX_IDS of + true -> + {noreply, clean(NewSt, NeedsSweep)}; + false -> + {noreply, NewSt} + end. + + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +clean(St, NeedsSweep) -> + #{ + last_sweep := LastSweep, + txids := TxIds + } = St, + fabric2_fdb:transactional(fun(Tx) -> + lists:foreach(fun(TxId) -> + erlfdb:clear(Tx, TxId) + end, TxIds), + case NeedsSweep of + true -> + sweep(Tx, LastSweep), + St#{ + last_sweep := os:timestamp(), + txids := [] + }; + false -> + St#{txids := []} + end + end). + + +sweep(Tx, {Mega, Secs, Micro}) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + Prefix = erlfdb_directory:get_name(CouchDB), + StartKey = erlfdb_tuple:pack({?TX_IDS}, Prefix), + EndKey = erlfdb_tuple:pack({?TX_IDS, Mega, Secs, Micro}, Prefix), + erlfdb:set_option(Tx, next_write_no_write_conflict_range), + erlfdb:clear_range(Tx, StartKey, EndKey). diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl index 021a0f7e7..8b3a3e3dd 100644 --- a/src/fabric/src/fabric2_util.erl +++ b/src/fabric/src/fabric2_util.erl @@ -14,9 +14,6 @@ -export([ - transactional/1, - get_db_handle/0, - revinfo_to_path/1, find_winning_revinfo/1, @@ -35,25 +32,6 @@ -include_lib("couch/include/couch_db.hrl"). --define(PDICT_DB_KEY, '$erlfdb_handle'). - - -transactional(Fun) when is_function(Fun, 1) -> - Db = get_db_handle(), - erlfdb:transactional(Db, Fun). - - -get_db_handle() -> - case get(?PDICT_DB_KEY) of - undefined -> - {ok, Db} = application:get_env(fabric, db), - put(?PDICT_DB_KEY, Db), - Db; - Db -> - Db - end. - - revinfo_to_path(RevInfo) -> #{ rev_id := {RevPos, Rev}, |