diff options
Diffstat (limited to 'src/rabbit_persister.erl')
-rw-r--r-- | src/rabbit_persister.erl | 29 |
1 files changed, 8 insertions, 21 deletions
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 53335a6f..2729b838 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -40,7 +40,7 @@ -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0, serial/0]). + force_snapshot/0]). -include("rabbit.hrl"). @@ -53,7 +53,7 @@ -define(MAX_WRAP_ENTRIES, 500). --define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -64,7 +64,7 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). +-record(psnapshot, {transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- @@ -83,7 +83,6 @@ -spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). --spec(serial/0 :: () -> non_neg_integer()). -endif. @@ -116,17 +115,13 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). -serial() -> - gen_server:call(?SERVER, serial, infinity). - %%-------------------------------------------------------------------- init(_Args) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{serial = 0, - transactions = dict:new(), + Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), queues = ets:new(queues, []), next_seq_id = 0}, @@ -144,9 +139,7 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), - NewSnapshot = LoadedSnapshot#psnapshot{ - serial = LoadedSnapshot#psnapshot.serial + 1}, + {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -169,9 +162,6 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); -handle_call(serial, _From, - State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> - do_reply(Serial, State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -343,8 +333,7 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions = Ts, +current_snapshot(_Snapshot = #psnapshot{transactions = Ts, messages = Messages, queues = Queues, next_seq_id = NextSeqId}) -> @@ -354,8 +343,7 @@ current_snapshot(_Snapshot = #psnapshot{serial = Serial, fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), - InnerSnapshot = {{serial, Serial}, - {txns, Ts}, + InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, {queues, ets:tab2list(Queues)}, {next_seq_id, NextSeqId}}, @@ -382,13 +370,12 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {{txns, Ts}, {messages, Ms}, {queues, Qs}, {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ - serial = Serial, transactions = Ts, next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), |