summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl9
-rw-r--r--src/rabbit_amqqueue.erl34
-rw-r--r--src/rabbit_amqqueue_process.erl108
-rw-r--r--src/rabbit_backing_queue.erl20
-rw-r--r--src/rabbit_mirror_queue_master.erl84
-rw-r--r--src/rabbit_mirror_queue_slave.erl66
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl214
8 files changed, 308 insertions, 243 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index d41fcb17..b85e4ad6 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,10 +25,13 @@
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
+-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/2 :: (rabbit_types:amqqueue(), attempt_recovery()) -> state()).
+-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
+ async_callback(), sync_callback()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
@@ -41,6 +44,7 @@
(false, rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
+-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
@@ -66,7 +70,6 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
--spec(invoke/3 :: (atom(), fun ((A) -> A), state()) ->
- {[rabbit_guid:guid()], state()}).
+-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
-spec(validate_message/2 :: (rabbit_types:basic_message(), state()) ->
{'invalid' | 'valid', state()}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 36b1662e..9820567c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,17 +17,11 @@
-module(rabbit_amqqueue).
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/3,
- maybe_run_queue_via_backing_queue_async/3,
- sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -35,6 +29,14 @@
-export([on_node_down/1]).
-export([store_queue/1]).
+
+%% internal
+-export([internal_declare/2, internal_delete/1,
+ run_backing_queue/3, run_backing_queue_async/3,
+ sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
+ set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
+ emit_stats/1]).
+
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -141,10 +143,12 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(maybe_run_queue_via_backing_queue/3 ::
- (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/3 ::
- (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(run_backing_queue_async/3 ::
+ (pid(), atom(),
+ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -440,13 +444,11 @@ internal_delete(QueueName) ->
end
end).
+run_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
-maybe_run_queue_via_backing_queue(QPid, Mod, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun},
- infinity).
-
-maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}).
+run_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d8cd510b..5aedb630 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -178,7 +178,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(Q, Recover),
+ BQS = bq_init(BQ, Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -188,6 +188,20 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+bq_init(BQ, Q, Recover) ->
+ Self = self(),
+ BQ:init(Q, Recover,
+ fun (Mod, Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
+ end,
+ fun (Mod, Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
+ end)
+ end).
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(fun({Arg, Fun}, State1) ->
case rabbit_misc:table_lookup(Arguments, Arg) of
@@ -230,13 +244,15 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- ensure_rate_timer(State),
- State2 = ensure_stats_timer(State1),
- case BQ:needs_idle_timeout(BQS) of
- true -> {ensure_sync_timer(State2), 0};
- false -> {stop_sync_timer(State2), hibernate}
+next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_stats_timer(
+ ensure_rate_timer(
+ confirm_messages(MsgIds, State#q{
+ backing_queue_state = BQS1}))),
+ case BQ:needs_idle_timeout(BQS1) of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
end.
backing_queue_module(#amqqueue{arguments = Args}) ->
@@ -435,6 +451,8 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+confirm_messages([], State) ->
+ State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC1} = lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
@@ -548,12 +566,12 @@ deliver_or_enqueue(Delivery, State) ->
ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) ->
- maybe_run_queue_via_backing_queue(
- BQ, fun (BQS) ->
- {_Guids, BQS1} =
- BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
- {[], BQS1}
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
+ run_backing_queue(
+ BQ, fun (M, BQS) ->
+ {_MsgIds, BQS1} =
+ M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
+ BQS1
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -657,15 +675,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- maybe_run_queue_via_backing_queue(
- BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
-maybe_run_queue_via_backing_queue(Mod, Fun,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
- run_message_queue(
- confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})).
+run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -798,29 +812,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -837,14 +851,14 @@ handle_call({init, Recover}, From,
true -> erlang:monitor(process, Owner),
declare(Recover, From, State);
false -> #q{backing_queue = BQ, backing_queue_state = undefined,
- q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ q = #amqqueue{name = QName} = Q} = State,
gen_server2:reply(From, not_found),
case Recover of
true -> ok;
_ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName])
end,
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, Q, Recover),
%% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}}
end;
@@ -1032,12 +1046,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Mod, Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
+ noreply(run_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 726b9bef..ce6143dd 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,7 +33,21 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 2},
+ %%
+ %% Takes
+ %% 1. the amqqueue record
+ %% 2. a boolean indicating whether the queue is an existing queue
+ %% that should be recovered
+ %% 3. an asynchronous callback which accepts a function from
+ %% state to state and invokes it with the current backing
+ %% queue state. This is useful for handling events, e.g. when
+ %% the backing queue does not have its own process to receive
+ %% such events, or when the processing of an event results in
+ %% a state transition the queue logic needs to know about
+ %% (such as messages getting confirmed).
+ %% 4. a synchronous callback. Same as the asynchronous callback
+ %% but waits for completion and returns 'error' on error.
+ {init, 4},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -54,6 +68,10 @@ behaviour_info(callbacks) ->
%% (i.e. saves the round trip through the backing queue).
{publish_delivered, 5},
+ %% Return ids of messages which have been confirmed since
+ %% the last invocation of this function (or initialisation).
+ {drain_confirmed, 1},
+
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
{dropwhile, 2},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 25a1e4b8..0ca73f03 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -16,10 +16,10 @@
-module(rabbit_mirror_queue_master).
--export([init/2, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/1, delete_and_terminate/1,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2,
+ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, validate_message/2]).
@@ -37,7 +37,8 @@
backing_queue,
backing_queue_state,
set_delivered,
- seen_status
+ seen_status,
+ confirmed
}).
%% ---------------------------------------------------------------------------
@@ -53,7 +54,8 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { arguments = Args, name = QName } = Q, Recover) ->
+init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
+ AsyncCallback, SyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
{_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
@@ -64,13 +66,14 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover) ->
end,
[rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1],
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, Recover),
+ BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = 0,
- seen_status = dict:new() }.
+ seen_status = dict:new(),
+ confirmed = [] }.
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
#state { gm = GM,
@@ -78,7 +81,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = BQ:len(BQS),
- seen_status = SeenStatus }.
+ seen_status = SeenStatus,
+ confirmed = [] }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -138,6 +142,35 @@ dropwhile(Fun, State = #state { gm = GM,
State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 }.
+drain_confirmed(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS,
+ confirmed = Confirmed }) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ {MsgIds1, SS1} =
+ lists:foldl(
+ fun (MsgId, {MsgIdsN, SSN}) ->
+ case dict:find(MsgId, SSN) of
+ error ->
+ {[MsgId | MsgIdsN], SSN};
+ {ok, published} ->
+ %% It was published when we were a slave,
+ %% and we were promoted before we saw the
+ %% publish from the channel. We still
+ %% haven't seen the channel publish, and
+ %% consequently we need to filter out the
+ %% confirm here. We will issue the confirm
+ %% when we see the publish from the channel.
+ {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {ok, confirmed} ->
+ %% Well, confirms are racy by definition.
+ {[MsgId | MsgIdsN], SSN}
+ end
+ end, {[], SS}, MsgIds),
+ {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1,
+ seen_status = SS1,
+ confirmed = [] }}.
+
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -236,38 +269,16 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
BQ:status(BQS).
invoke(?MODULE, Fun, State) ->
- Fun(State);
+ Fun(?MODULE, State);
invoke(Mod, Fun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
- {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
- {MsgIds1, SS1} =
- lists:foldl(
- fun (MsgId, {MsgIdsN, SSN}) ->
- case dict:find(MsgId, SSN) of
- error ->
- {[MsgId | MsgIdsN], SSN};
- {ok, published} ->
- %% It was published when we were a slave,
- %% and we were promoted before we saw the
- %% publish from the channel. We still
- %% haven't seen the channel publish, and
- %% consequently we need to filter out the
- %% confirm here. We will issue the confirm
- %% when we see the publish from the channel.
- {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
- {ok, confirmed} ->
- %% Well, confirms are racy by definition.
- {[MsgId | MsgIdsN], SSN}
- end
- end, {[], SS}, MsgIds),
- {MsgIds1, State #state { backing_queue_state = BQS1,
- seen_status = SS1 }}.
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
validate_message(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ confirmed = Confirmed }) ->
%% Here, we need to deal with the possibility that we're about to
%% receive a message that we've already seen when we were a slave
%% (we received it via gm). Thus if we do receive such message now
@@ -299,7 +310,6 @@ validate_message(Message = #basic_message { id = MsgId },
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
%% immediately prior to calling validate_message/2.
- ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
- {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}
+ {invalid, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 064dc329..d20b00d4 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -94,7 +94,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, false),
+ BQS = bq_init(BQ, Q, false),
{ok, #state { q = Q,
gm = GM,
master_node = node(MPid),
@@ -154,12 +154,12 @@ handle_call({gm_deaths, Deaths}, From,
{stop, normal, State}
end;
-handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Mod, Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
+ noreply(run_backing_queue(Mod, Fun, State));
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
@@ -235,20 +235,20 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
- {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
- {gm_deaths, _Deaths} -> 5;
- _ -> 0
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
- {gm, _Msg} -> 5;
- _ -> 0
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ _ -> 0
end.
%% ---------------------------------------------------------------------------
@@ -282,12 +282,23 @@ handle_msg([SPid], _From, Msg) ->
%% Others
%% ---------------------------------------------------------------------------
-maybe_run_queue_via_backing_queue(
- Mod, Fun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
- confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 }).
-
+bq_init(BQ, Q, Recover) ->
+ Self = self(),
+ BQ:init(Q, Recover,
+ fun (Mod, Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
+ end,
+ fun (Mod, Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
+ end)
+ end).
+
+run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
never;
@@ -430,18 +441,19 @@ reply(Reply, State) ->
{NewState, Timeout} = next_state(State),
{reply, Reply, NewState, Timeout}.
-next_state(State) ->
- State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
- ensure_rate_timer(State),
- case BQ:needs_idle_timeout(BQS) of
+next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_rate_timer(
+ confirm_messages(MsgIds, State #state {
+ backing_queue_state = BQS1 })),
+ case BQ:needs_idle_timeout(BQS1) of
true -> {ensure_sync_timer(State1), 0};
false -> {stop_sync_timer(State1), hibernate}
end.
%% copied+pasted from amqqueue_process
backing_queue_idle_timeout(State = #state { backing_queue = BQ }) ->
- maybe_run_queue_via_backing_queue(
- BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State).
ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e9b8a020..6f5abe3e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2080,6 +2080,10 @@ test_queue_index() ->
passed.
+variable_queue_init(Q, Recover) ->
+ rabbit_variable_queue:init(
+ Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1).
+
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
fun (_N, VQN) ->
@@ -2114,8 +2118,7 @@ test_amqqueue(Durable) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_amqqueue(true), false,
- fun nop/2, fun nop/1),
+ VQ = variable_queue_init(test_amqqueue(true), false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -2290,8 +2293,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
- fun nop/2, fun nop/1),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2308,8 +2310,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
- fun nop/2, fun nop/1),
+ VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2340,8 +2341,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(Q, true,
- fun nop/2, fun nop/1),
+ VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c9d96db7..9704668e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,9 +16,9 @@
-module(rabbit_variable_queue).
--export([init/2, terminate/1, delete_and_terminate/1,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/4, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -27,7 +27,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/4]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -238,6 +238,9 @@
durable,
transient_threshold,
+ async_callback,
+ sync_callback,
+
len,
persistent_count,
@@ -252,6 +255,7 @@
msgs_on_disk,
msg_indices_on_disk,
unconfirmed,
+ confirmed,
ack_out_counter,
ack_in_counter,
ack_rates
@@ -332,11 +336,14 @@
{any(), binary()}},
on_sync :: sync(),
durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+
+ async_callback :: async_callback(),
+ sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
- transient_threshold :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
@@ -347,6 +354,7 @@
msgs_on_disk :: gb_set(),
msg_indices_on_disk :: gb_set(),
unconfirmed :: gb_set(),
+ confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
@@ -397,27 +405,26 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover) ->
- Self = self(),
- init(Queue, Recover,
+init(Queue, Recover, AsyncCallback, SyncCallback) ->
+ init(Queue, Recover, AsyncCallback, SyncCallback,
fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(Self, MsgIds, ActionTaken)
+ msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
- fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end).
+ fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
- MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun);
+ MsgOnDiskFun, AsyncCallback);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName }, true,
- MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = true }, true,
+ AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -427,9 +434,9 @@ init(#amqqueue { name = QueueName }, true,
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun),
+ MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
- undefined),
+ undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -438,7 +445,7 @@ init(#amqqueue { name = QueueName }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
terminate(State) ->
@@ -513,9 +520,10 @@ publish(Msg, MsgProps, _ChPid, State) ->
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { len = 0 }) ->
+ _ChPid, State = #vqstate { async_callback = Callback,
+ len = 0 }) ->
case NeedsConfirming of
- true -> blind_confirm(self(), gb_sets:singleton(MsgId));
+ true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
false -> ok
end,
{undefined, a(State)};
@@ -523,14 +531,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
- _ChPid,
- State = #vqstate { len = 0,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ _ChPid, State = #vqstate { len = 0,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
@@ -545,6 +552,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
a(State1).
@@ -689,6 +699,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
tx_commit(Txn, Fun, MsgPropsFun,
State = #vqstate { durable = IsDurable,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
@@ -697,10 +709,13 @@ tx_commit(Txn, Fun, MsgPropsFun,
HasPersistentPubs = PersistentMsgIds =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = msg_store_sync(
- MSCState, true, PersistentMsgIds,
- msg_store_callback(PersistentMsgIds, Pubs, AckTags1,
- Fun, MsgPropsFun)),
+ true -> MsgStoreCallback =
+ fun () -> msg_store_callback(
+ PersistentMsgIds, Pubs, AckTags1, Fun,
+ MsgPropsFun, AsyncCallback, SyncCallback)
+ end,
+ ok = msg_store_sync(MSCState, true, PersistentMsgIds,
+ fun () -> spawn(MsgStoreCallback) end),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
@@ -866,7 +881,7 @@ status(#vqstate {
{avg_ack_egress_rate , AvgAckEgressRate} ].
invoke(?MODULE, Fun, State) ->
- Fun(State).
+ Fun(?MODULE, State).
validate_message(_Msg, State) -> {valid, State}.
@@ -939,13 +954,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+ msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
- rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun,
- msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
+ rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -978,15 +993,9 @@ msg_store_close_fds(MSCState, IsPersistent) ->
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
msg_store_close_fds_fun(IsPersistent) ->
- Self = self(),
- fun () ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self, ?MODULE,
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} =
- msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
- end)
+ fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
end.
maybe_write_delivered(false, _SeqId, IndexState) ->
@@ -1072,7 +1081,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient) ->
+ AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1098,6 +1107,9 @@ init(IsDurable, IndexState, DeltaCount, Terms,
durable = IsDurable,
transient_threshold = NextSeqId,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
+
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1112,6 +1124,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
ack_out_counter = 0,
ack_in_counter = 0,
ack_rates = blank_rate(Now, 0) },
@@ -1124,24 +1137,20 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) ->
- Self = self(),
- F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, ?MODULE,
- fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
- end,
- fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> remove_persistent_messages(
- PersistentMsgIds)
- end, F)
- end)
+msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
+ AsyncCallback, SyncCallback) ->
+ case SyncCallback(?MODULE,
+ fun (?MODULE, StateN) ->
+ tx_commit_post_msg_store(true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
+ end) of
+ ok -> ok;
+ error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback)
end.
-remove_persistent_messages(MsgIds) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
+remove_persistent_messages(MsgIds, AsyncCallback) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
+ undefined, AsyncCallback),
ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
@@ -1432,12 +1441,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC,
+ confirmed = C }) ->
State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
- unconfirmed = gb_sets:difference(UC, MsgIdSet) }.
+ unconfirmed = gb_sets:difference(UC, MsgIdSet),
+ confirmed = gb_sets:union (C, MsgIdSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1454,40 +1465,35 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(MsgIdSet, State) ->
- {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}.
-
-blind_confirm(QPid, MsgIdSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, ?MODULE, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
-
-msgs_written_to_disk(QPid, MsgIdSet, removed) ->
- blind_confirm(QPid, MsgIdSet);
-msgs_written_to_disk(QPid, MsgIdSet, written) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, ?MODULE,
- fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(
- MOD, gb_sets:intersection(UC, MsgIdSet)) })
- end).
-
-msg_indices_written_to_disk(QPid, MsgIdSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, ?MODULE,
- fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(
- MIOD, gb_sets:intersection(UC, MsgIdSet)) })
- end).
+blind_confirm(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
+msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+ blind_confirm(Callback, MsgIdSet);
+msgs_written_to_disk(Callback, MsgIdSet, written) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(MOD, Confirmed) })
+ end).
+
+msg_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(MIOD, Confirmed) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes