diff options
Diffstat (limited to 'src/fabric/src/fabric2_txids.erl')
-rw-r--r-- | src/fabric/src/fabric2_txids.erl | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/src/fabric/src/fabric2_txids.erl b/src/fabric/src/fabric2_txids.erl new file mode 100644 index 000000000..285e342ed --- /dev/null +++ b/src/fabric/src/fabric2_txids.erl @@ -0,0 +1,153 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_txids). +-behaviour(gen_server). +-vsn(1). + + +-export([ + start_link/0, + create/2, + remove/1 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + format_status/2 +]). + + +-include("fabric2.hrl"). + + +-define(ONE_HOUR, 3600000000). +-define(MAX_TX_IDS, 1000). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +create(Tx, undefined) -> + Prefix = fabric2_fdb:get_dir(Tx), + create(Tx, Prefix); + +create(_Tx, LayerPrefix) -> + {Mega, Secs, Micro} = os:timestamp(), + Key = {?TX_IDS, Mega, Secs, Micro, fabric2_util:uuid()}, + erlfdb_tuple:pack(Key, LayerPrefix). + + +remove(TxId) when is_binary(TxId) -> + gen_server:cast(?MODULE, {remove, TxId}); + +remove(undefined) -> + ok. + + + +init(_) -> + {ok, #{ + last_sweep => os:timestamp(), + txids => [] + }}. + + +terminate(_, #{txids := TxIds}) -> + if TxIds == [] -> ok; true -> + fabric2_fdb:transactional(fun(Tx) -> + lists:foreach(fun(TxId) -> + erlfdb:clear(Tx, TxId) + end, TxIds) + end) + end, + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast({remove, TxId}, St) -> + #{ + last_sweep := LastSweep, + txids := TxIds + } = St, + + NewTxIds = [TxId | TxIds], + NewSt = St#{txids := NewTxIds}, + + NeedsSweep = timer:now_diff(os:timestamp(), LastSweep) > ?ONE_HOUR, + + case NeedsSweep orelse length(NewTxIds) >= ?MAX_TX_IDS of + true -> + {noreply, clean(NewSt, NeedsSweep)}; + false -> + {noreply, NewSt} + end. + + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +format_status(_Opt, [_PDict, State]) -> + #{ + txids := TxIds + } = State, + Scrubbed = State#{ + txids => {length, length(TxIds)} + }, + [{data, [{"State", + Scrubbed + }]}]. + + +clean(St, NeedsSweep) -> + #{ + last_sweep := LastSweep, + txids := TxIds + } = St, + fabric2_fdb:transactional(fun(Tx) -> + lists:foreach(fun(TxId) -> + erlfdb:clear(Tx, TxId) + end, TxIds), + case NeedsSweep of + true -> + sweep(Tx, LastSweep), + St#{ + last_sweep := os:timestamp(), + txids := [] + }; + false -> + St#{txids := []} + end + end). + + +sweep(Tx, {Mega, Secs, Micro}) -> + Prefix = fabric2_fdb:get_dir(Tx), + StartKey = erlfdb_tuple:pack({?TX_IDS}, Prefix), + EndKey = erlfdb_tuple:pack({?TX_IDS, Mega, Secs, Micro}, Prefix), + erlfdb:set_option(Tx, next_write_no_write_conflict_range), + erlfdb:clear_range(Tx, StartKey, EndKey). |