summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-04-12 13:24:34 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-04-12 13:55:09 -0500
commit20c1846fe0db79fee53cf554497b1235356eef9b (patch)
tree9fd06302b1b14667566da09fbb6c4ba970f1de75
parentdb3b066e290a8e20b78311dcbf2f5a4b5018c8e0 (diff)
downloadcouchdb-20c1846fe0db79fee53cf554497b1235356eef9b.tar.gz
First pass at handling COMMIT_UNKNOWN_RESULT
-rw-r--r--src/fabric/src/fabric2_db.erl48
-rw-r--r--src/fabric/src/fabric2_fdb.erl117
-rw-r--r--src/fabric/src/fabric2_txid_cleaner.erl131
-rw-r--r--src/fabric/src/fabric2_util.erl22
4 files changed, 249 insertions, 69 deletions
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 8168f419c..52ddc816c 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -162,16 +162,7 @@ create(DbName, Options) ->
open(DbName, Options) ->
case fabric2_server:fetch(DbName) of
#{} = Db ->
- with_tx(Db, fun(TxDb) ->
- case fabric2_fdb:is_current(TxDb) of
- true ->
- {ok, maybe_set_user_ctx(Db, Options)};
- false ->
- Reopened = fabric2_fdb:open(TxDb, Options),
- ok = fabric2_server:store(Reopened),
- {ok, Reopened}
- end
- end);
+ {ok, maybe_set_user_ctx(Db, Options)};
undefined ->
transactional(DbName, Options, fun(TxDb) ->
Opened = fabric2_fdb:open(TxDb, Options),
@@ -184,7 +175,7 @@ open(DbName, Options) ->
delete(DbName, Options) ->
% This will throw if the db does not exist
{ok, Db} = open(DbName, Options),
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:delete(TxDb)
end).
@@ -252,7 +243,7 @@ get_compactor_pid(#{} = _Db) ->
get_db_info(#{} = Db) ->
- DbProps = with_tx(Db, fun(TxDb) ->
+ DbProps = fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:get_info(TxDb)
end),
@@ -290,7 +281,7 @@ get_doc_count(DbName, <<"_local">>) ->
get_doc_count(DbName, <<"doc_local_count">>);
get_doc_count(Db, Key) ->
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:get_stat(TxDb, Key)
end).
@@ -361,14 +352,14 @@ is_system_db_name(DbName) when is_binary(DbName) ->
set_revs_limit(#{} = Db, RevsLimit) ->
RevsLimBin = ?uint2bin(RevsLimit),
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:set_config(TxDb, <<"revs_limit">>, RevsLimBin)
end).
set_security(#{} = Db, Security) ->
SecBin = ?JSON_ENCODE(Security),
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:set_config(TxDb, <<"security_doc">>, SecBin)
end).
@@ -390,7 +381,7 @@ open_doc(#{} = Db, DocId) ->
open_doc(#{} = Db, DocId, _Options) ->
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
case fabric2_fdb:get_winning_revs(TxDb, DocId, 1) of
[] ->
{not_found, missing};
@@ -405,7 +396,7 @@ open_doc(#{} = Db, DocId, _Options) ->
open_doc_revs(Db, DocId, Revs, Options) ->
Latest = lists:member(latest, Options),
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
#full_doc_info{
rev_tree = RevTree
} = fabric2_db:get_full_doc_info(TxDb, DocId),
@@ -439,7 +430,7 @@ update_doc(Db, Doc) ->
update_doc(Db, Doc, Options) ->
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
update_doc_int(TxDb, Doc, Options)
end).
@@ -450,7 +441,7 @@ update_docs(Db, Docs) ->
update_docs(Db, Docs, Options) ->
{Resps, Status} = lists:mapfoldl(fun(Doc, Acc) ->
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
case update_doc_int(TxDb, Doc, Options) of
{ok, _} = Resp ->
{Resp, Acc};
@@ -467,7 +458,7 @@ fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc, Options) ->
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options)
end).
@@ -477,7 +468,7 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
- with_tx(Db, fun(TxDb) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options)
end).
@@ -893,21 +884,6 @@ find_prev_revinfo(_Pos, [{_Rev, #{} = RevInfo} | _]) ->
RevInfo.
-transactional(DbName, Options, Fun) ->
- fabric2_util:transactional(fun(Tx) ->
- Fun(fabric2_fdb:init(Tx, DbName, Options))
- end).
-
-
-with_tx(#{tx := undefined} = Db, Fun) ->
- fabric2_util:transactional(fun(Tx) ->
- Fun(Db#{tx => Tx})
- end);
-
-with_tx(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
- Fun(Db).
-
-
doc_to_revid(#doc{revs = Revs}) ->
case Revs of
{0, []} -> {0, <<>>};
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 3416b874c..549e94feb 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -14,7 +14,9 @@
-export([
- init/3,
+ transactional/1,
+ transactional/3,
+ transactional/2,
create/2,
open/2,
@@ -67,6 +69,7 @@
-define(CLUSTER_CONFIG, 0).
-define(ALL_DBS, 1).
-define(DBS, 15).
+-define(TX_IDS, 255).
-define(DB_CONFIG, 16).
-define(DB_STATS, 17).
@@ -82,6 +85,15 @@
-define(CURR_REV_FORMAT, 0).
+% Misc constants
+
+-define(PDICT_DB_KEY, '$fabric_db_handle').
+-define(PDICT_LAYER_CACHE, '$fabric_layer_id').
+-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
+-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
+-define(COMMIT_UNKOWN_RESULT, 1021).
+
+
% Various utility macros
-define(REQUIRE_TX(Db), {erlfdb_transaction, _} = maps:get(tx, Db)).
@@ -90,16 +102,30 @@
-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
-init(Tx, DbName, Options) ->
- Root = erlfdb_directory:root(),
- CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
- Prefix = erlfdb_directory:get_name(CouchDB),
- #{
- name => DbName,
- tx => Tx,
- layer_prefix => Prefix,
- options => Options
- }.
+transactional(Fun) when is_function(Fun, 1) ->
+ Db = get_db_handle(),
+ try
+ erlfdb:transactional(Db, fun(Tx) ->
+ tx_wrap(Tx, Fun)
+ end)
+ after
+ tx_clear()
+ end.
+
+
+transactional(DbName, Options, Fun) when is_binary(DbName) ->
+ transactional(fun(Tx) ->
+ Fun(init_db(Tx, DbName, Options))
+ end).
+
+
+transactional(#{tx := undefined} = Db, Fun) ->
+ transactional(fun(Tx) ->
+ Fun(Db#{tx => Tx})
+ end);
+
+transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
+ Fun(Db).
create(#{} = Db, Options) ->
@@ -615,6 +641,18 @@ get_changes(#{} = Db, Options) ->
end, erlfdb:wait(Future)).
+init(Tx, DbName, Options) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ #{
+ name => DbName,
+ tx => Tx,
+ layer_prefix => Prefix,
+ options => Options
+ }.
+
+
bump_metadata_version(Tx) ->
% The 14 zero bytes is pulled from the PR for adding the
% metadata version key. Not sure why 14 bytes when version
@@ -702,3 +740,60 @@ fdb_to_doc(_Db, DocId, Pos, Path, Bin) when is_binary(Bin) ->
};
fdb_to_doc(_Db, _DocId, _Pos, _Path, not_found) ->
{not_found, missing}.
+
+
+get_db_handle() ->
+ case get(?PDICT_DB_KEY) of
+ undefined ->
+ {ok, Db} = application:get_env(fabric, db),
+ put(?PDICT_DB_KEY, Db),
+ Db;
+ Db ->
+ Db
+ end.
+
+
+tx_wrap(Tx, Fun) ->
+ case get(erlfdb_error) of
+ ?COMMIT_UNKNOWN_RESULT ->
+ tx_check_applied(Tx, Fun);
+ _ ->
+ tx_attempt(Tx, Fun)
+ end.
+
+
+tx_clear() ->
+ fabric2_txid_cleaner:remove(get(?PDICT_TX_ID_KEY)),
+ put(?PDICT_TX_ID_KEY, undefined),
+ put(?PDICT_TX_RES_KEY, undefined).
+
+
+tx_check_applied(Tx, Fun) ->
+ case get(?PDICT_TX_ID_KEY) of
+ undefined ->
+ tx_attempt(Tx, Fun);
+ TxId when is_binary(TxId) ->
+ case erlfdb:wait(erlfdb:get(Tx, TxId)) of
+ <<>> ->
+ get(?PDICT_TX_RES_KEY);
+ not_found ->
+ tx_attempt(Tx, Fun)
+ end
+ end.
+
+
+tx_attempt(Tx, Fun) ->
+ Result = Fun(Tx),
+ TxId = tx_id(Tx),
+ ok = erlfdb:set(Tx, TxId, <<>>),
+ put(?PDICT_TX_RES_KEY, Result),
+ Result.
+
+
+tx_id(Tx) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ {Mega, Secs, Micro} = os:timestamp(),
+ Key = {?TX_IDS, Mega, Secs, Micro, fabric2_util:uuid()},
+ erlfdb_tuple:pack(Key, Prefix).
diff --git a/src/fabric/src/fabric2_txid_cleaner.erl b/src/fabric/src/fabric2_txid_cleaner.erl
new file mode 100644
index 000000000..eaefbcc99
--- /dev/null
+++ b/src/fabric/src/fabric2_txid_cleaner.erl
@@ -0,0 +1,131 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_txid_cleaner).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([
+ start_link/0,
+ remove/1
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("fabric2.hrl").
+
+
+-define(ONE_HOUR, 3600000000).
+-define(MAX_TX_IDS, 1000).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+remove(TxId) when is_binary(TxId) ->
+ gen_server:cast(?MODULE, {remove, TxId});
+
+remove(undefined) ->
+ ok.
+
+
+
+init(_) ->
+ {ok, #{
+ last_sweep => os:timestamp(),
+ txids => []
+ }}.
+
+
+terminate(_, #{txids := TxIds}) ->
+ if TxIds == [] -> ok; true ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ lists:foreach(fun(TxId) ->
+ erlfdb:clear(Tx, TxId)
+ end)
+ end)
+ end,
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast({remove, TxId}, St) ->
+ #{
+ last_sweep := LastSweep,
+ txidsd := TxIds
+ } = St,
+
+ NewTxIds = [TxId | TxIds],
+ NewSt = St#{txids := NewTxIds},
+
+ NeedsSweep = os:timer_diff(os:timestamp(), LastSweep) > ?ONE_HOUR,
+
+ case NeedsSweep orelse length(NewTxIds) >= ?MAX_TX_IDS of
+ true ->
+ {noreply, clean(NewSt, NeedsSweep)};
+ false ->
+ {noreply, NewSt}
+ end.
+
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+clean(St, NeedsSweep) ->
+ #{
+ last_sweep := LastSweep,
+ txids := TxIds
+ } = St,
+ fabric2_fdb:transactional(fun(Tx) ->
+ lists:foreach(fun(TxId) ->
+ erlfdb:clear(Tx, TxId)
+ end, TxIds),
+ case NeedsSweep of
+ true ->
+ sweep(Tx, LastSweep),
+ St#{
+ last_sweep := os:timestamp(),
+ txids := []
+ };
+ false ->
+ St#{txids := []}
+ end
+ end).
+
+
+sweep(Tx, {Mega, Secs, Micro}) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ Prefix = erlfdb_directory:get_name(CouchDB),
+ StartKey = erlfdb_tuple:pack({?TX_IDS}, Prefix),
+ EndKey = erlfdb_tuple:pack({?TX_IDS, Mega, Secs, Micro}, Prefix),
+ erlfdb:set_option(Tx, next_write_no_write_conflict_range),
+ erlfdb:clear_range(Tx, StartKey, EndKey).
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 021a0f7e7..8b3a3e3dd 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -14,9 +14,6 @@
-export([
- transactional/1,
- get_db_handle/0,
-
revinfo_to_path/1,
find_winning_revinfo/1,
@@ -35,25 +32,6 @@
-include_lib("couch/include/couch_db.hrl").
--define(PDICT_DB_KEY, '$erlfdb_handle').
-
-
-transactional(Fun) when is_function(Fun, 1) ->
- Db = get_db_handle(),
- erlfdb:transactional(Db, Fun).
-
-
-get_db_handle() ->
- case get(?PDICT_DB_KEY) of
- undefined ->
- {ok, Db} = application:get_env(fabric, db),
- put(?PDICT_DB_KEY, Db),
- Db;
- Db ->
- Db
- end.
-
-
revinfo_to_path(RevInfo) ->
#{
rev_id := {RevPos, Rev},