summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl6
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl162
5 files changed, 114 insertions, 90 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index accb2c0e..2e4d1b0a 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -25,11 +25,13 @@
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
+-type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')).
+-type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) ->
- state()).
+-spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(),
+ async_callback(), sync_callback()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 44053593..cf2a3949 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -149,7 +149,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -159,6 +159,20 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+bq_init(BQ, QName, IsDurable, Recover) ->
+ Self = self(),
+ BQ:init(QName, IsDurable, Recover,
+ fun (Fun) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ Self, Fun)
+ end,
+ fun (Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self, Fun) end)
+ end).
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(fun({Arg, Fun}, State1) ->
case rabbit_misc:table_lookup(Arguments, Arg) of
@@ -797,7 +811,7 @@ handle_call({init, Recover}, From,
_ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName])
end,
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
%% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}}
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6a21e10f..a8e201ea 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,7 +33,7 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 3},
+ {init, 5},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 0c6250df..99bb1c4b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2003,6 +2003,10 @@ test_queue_index() ->
passed.
+variable_queue_init(QName, IsDurable, Recover) ->
+ rabbit_variable_queue:init(QName, IsDurable, Recover,
+ fun nop/1, fun nop/1, fun nop/2, fun nop/1).
+
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
fun (_N, VQN) ->
@@ -2033,8 +2037,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/2, fun nop/1),
+ VQ = variable_queue_init(test_queue(), true, false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -2209,8 +2212,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/2, fun nop/1),
+ VQ7 = variable_queue_init(test_queue(), true, true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2226,8 +2228,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/2, fun nop/1),
+ VQ7 = variable_queue_init(test_queue(), true, true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2258,8 +2259,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/2, fun nop/1),
+ VQ1 = variable_queue_init(QName, true, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 58a28d32..7f702409 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
+-export([init/5, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -27,7 +27,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/7]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -238,6 +238,9 @@
durable,
transient_threshold,
+ async_callback,
+ sync_callback,
+
len,
persistent_count,
@@ -332,11 +335,14 @@
{any(), binary()}},
on_sync :: sync(),
durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+
+ async_callback :: async_callback(),
+ sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
- transient_threshold :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
@@ -397,25 +403,26 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
- Self = self(),
- init(QueueName, IsDurable, Recover,
+init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) ->
+ init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback,
fun (Guids, ActionTaken) ->
- msgs_written_to_disk(Self, Guids, ActionTaken)
+ msgs_written_to_disk(AsyncCallback, Guids, ActionTaken)
end,
- fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
+ fun (Guids) -> msg_indices_written_to_disk(AsyncCallback, Guids) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun);
+ MsgOnDiskFun, AsyncCallback);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, true, true, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -425,9 +432,9 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun),
+ MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
- undefined),
+ undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -437,7 +444,7 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
end,
MsgIdxOnDiskFun),
init(true, IndexState, DeltaCount, Terms1,
- PersistentClient, TransientClient).
+ PersistentClient, TransientClient, AsyncCallback, SyncCallback).
terminate(State) ->
State1 = #vqstate { persistent_count = PCount,
@@ -512,9 +519,9 @@ publish(Msg, MsgProps, State) ->
publish_delivered(false, #basic_message { guid = Guid },
#message_properties {
needs_confirming = NeedsConfirming },
- State = #vqstate { len = 0 }) ->
+ State = #vqstate { async_callback = Callback, len = 0 }) ->
case NeedsConfirming of
- true -> blind_confirm(self(), gb_sets:singleton(Guid));
+ true -> blind_confirm(Callback, gb_sets:singleton(Guid));
false -> ok
end,
{undefined, a(State)};
@@ -685,6 +692,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
tx_commit(Txn, Fun, MsgPropsFun,
State = #vqstate { durable = IsDurable,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
@@ -696,7 +705,8 @@ tx_commit(Txn, Fun, MsgPropsFun,
true -> ok = msg_store_sync(
MSCState, true, PersistentGuids,
msg_store_callback(PersistentGuids, Pubs, AckTags1,
- Fun, MsgPropsFun)),
+ Fun, MsgPropsFun,
+ AsyncCallback, SyncCallback)),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
@@ -929,13 +939,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+ msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
rabbit_msg_store:client_init(
MsgStore, Ref, MsgOnDiskFun,
- msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
+ msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE, Callback)).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
with_immutable_msg_store_state(
@@ -967,16 +977,13 @@ msg_store_close_fds(MSCState, IsPersistent) ->
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
-msg_store_close_fds_fun(IsPersistent) ->
- Self = self(),
- fun () ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} =
- msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
- end)
+msg_store_close_fds_fun(IsPersistent, Callback) ->
+ fun () -> Callback(
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} =
+ msg_store_close_fds(MSCState, IsPersistent),
+ {[], State #vqstate { msg_store_clients = MSCState1 }}
+ end)
end.
maybe_write_delivered(false, _SeqId, IndexState) ->
@@ -1062,7 +1069,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient) ->
+ PersistentClient, TransientClient, AsyncCallback, SyncCallback) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1088,6 +1095,9 @@ init(IsDurable, IndexState, DeltaCount, Terms,
durable = IsDurable,
transient_threshold = NextSeqId,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
+
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1114,23 +1124,24 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
- Self = self(),
- F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
- end,
- fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> remove_persistent_messages(
- PersistentGuids)
- end, F)
+msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun,
+ AsyncCallback, SyncCallback) ->
+ fun () -> spawn(fun () -> case SyncCallback(
+ fun (StateN) ->
+ tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
+ end) of
+ ok -> ok;
+ error -> remove_persistent_messages(
+ PersistentGuids, AsyncCallback)
+ end
end)
end.
-remove_persistent_messages(Guids) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
+remove_persistent_messages(Guids, AsyncCallback) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
+ undefined, AsyncCallback),
ok = rabbit_msg_store:remove(Guids, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
@@ -1442,35 +1453,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-blind_confirm(QPid, GuidSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
-
-msgs_written_to_disk(QPid, GuidSet, removed) ->
- blind_confirm(QPid, GuidSet);
-msgs_written_to_disk(QPid, GuidSet, written) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(
- MOD, gb_sets:intersection(UC, GuidSet)) })
- end).
-
-msg_indices_written_to_disk(QPid, GuidSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(
- MIOD, gb_sets:intersection(UC, GuidSet)) })
- end).
+blind_confirm(Callback, GuidSet) ->
+ Callback(fun (State) -> msgs_confirmed(GuidSet, State) end).
+
+msgs_written_to_disk(Callback, GuidSet, removed) ->
+ blind_confirm(Callback, GuidSet);
+msgs_written_to_disk(Callback, GuidSet, written) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(
+ MOD, gb_sets:intersection(UC, GuidSet)) })
+ end).
+
+msg_indices_written_to_disk(Callback, GuidSet) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(
+ MIOD, gb_sets:intersection(UC, GuidSet)) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes