summaryrefslogtreecommitdiff
path: root/src/rabbit_persister.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_persister.erl')
-rw-r--r--src/rabbit_persister.erl29
1 files changed, 8 insertions, 21 deletions
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 53335a6f..2729b838 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -40,7 +40,7 @@
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
+ force_snapshot/0]).
-include("rabbit.hrl").
@@ -53,7 +53,7 @@
-define(MAX_WRAP_ENTRIES, 500).
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +64,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
+-record(psnapshot, {transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -83,7 +83,6 @@
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-endif.
@@ -116,17 +115,13 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
%%--------------------------------------------------------------------
init(_Args) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
+ Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
queues = ets:new(queues, []),
next_seq_id = 0},
@@ -144,9 +139,7 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
+ {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -169,9 +162,6 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -343,8 +333,7 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions = Ts,
+current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
messages = Messages,
queues = Queues,
next_seq_id = NextSeqId}) ->
@@ -354,8 +343,7 @@ current_snapshot(_Snapshot = #psnapshot{serial = Serial,
fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
+ InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
{next_seq_id, NextSeqId}},
@@ -382,13 +370,12 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {{txns, Ts}, {messages, Ms}, {queues, Qs},
{next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
- serial = Serial,
transactions = Ts,
next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),