diff options
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 162 |
5 files changed, 114 insertions, 90 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index accb2c0e..2e4d1b0a 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,11 +25,13 @@ -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). +-type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')). +-type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> - state()). +-spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(), + async_callback(), sync_callback()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 44053593..cf2a3949 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -149,7 +149,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = BQ:init(QName, IsDurable, Recover), + BQS = bq_init(BQ, QName, IsDurable, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -159,6 +159,20 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +bq_init(BQ, QName, IsDurable, Recover) -> + Self = self(), + BQ:init(QName, IsDurable, Recover, + fun (Fun) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + Self, Fun) + end, + fun (Fun) -> + rabbit_misc:with_exit_handler( + fun () -> error end, + fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, Fun) end) + end). + process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> lists:foldl(fun({Arg, Fun}, State1) -> case rabbit_misc:table_lookup(Arguments, Arg) of @@ -797,7 +811,7 @@ handle_call({init, Recover}, From, _ -> rabbit_log:warning( "Queue ~p exclusive owner went away~n", [QName]) end, - BQS = BQ:init(QName, IsDurable, Recover), + BQS = bq_init(BQ, QName, IsDurable, Recover), %% Rely on terminate to delete the queue. {stop, normal, State#q{backing_queue_state = BQS}} end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6a21e10f..a8e201ea 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,7 +33,7 @@ behaviour_info(callbacks) -> {stop, 0}, %% Initialise the backing queue and its state. - {init, 3}, + {init, 5}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0c6250df..99bb1c4b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2003,6 +2003,10 @@ test_queue_index() -> passed. +variable_queue_init(QName, IsDurable, Recover) -> + rabbit_variable_queue:init(QName, IsDurable, Recover, + fun nop/1, fun nop/1, fun nop/2, fun nop/1). + variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( fun (_N, VQN) -> @@ -2033,8 +2037,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/2, fun nop/1), + VQ = variable_queue_init(test_queue(), true, false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2209,8 +2212,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/2, fun nop/1), + VQ7 = variable_queue_init(test_queue(), true, true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2226,8 +2228,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/2, fun nop/1), + VQ7 = variable_queue_init(test_queue(), true, true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2258,8 +2259,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/2, fun nop/1), + VQ1 = variable_queue_init(QName, true, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 58a28d32..7f702409 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, +-export([init/5, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -27,7 +27,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/7]). %%---------------------------------------------------------------------------- %% Definitions: @@ -238,6 +238,9 @@ durable, transient_threshold, + async_callback, + sync_callback, + len, persistent_count, @@ -332,11 +335,14 @@ {any(), binary()}}, on_sync :: sync(), durable :: boolean(), + transient_threshold :: non_neg_integer(), + + async_callback :: async_callback(), + sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), - transient_threshold :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), @@ -397,25 +403,26 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - Self = self(), - init(QueueName, IsDurable, Recover, +init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) -> + init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback, fun (Guids, ActionTaken) -> - msgs_written_to_disk(Self, Guids, ActionTaken) + msgs_written_to_disk(AsyncCallback, Guids, ActionTaken) end, - fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + fun (Guids) -> msg_indices_written_to_disk(AsyncCallback, Guids) end). -init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, + MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun); + MsgOnDiskFun, AsyncCallback); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(QueueName, true, true, AsyncCallback, SyncCallback, + MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -425,9 +432,9 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun), + MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef, - undefined), + undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, @@ -437,7 +444,7 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> end, MsgIdxOnDiskFun), init(true, IndexState, DeltaCount, Terms1, - PersistentClient, TransientClient). + PersistentClient, TransientClient, AsyncCallback, SyncCallback). terminate(State) -> State1 = #vqstate { persistent_count = PCount, @@ -512,9 +519,9 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, #basic_message { guid = Guid }, #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { len = 0 }) -> + State = #vqstate { async_callback = Callback, len = 0 }) -> case NeedsConfirming of - true -> blind_confirm(self(), gb_sets:singleton(Guid)); + true -> blind_confirm(Callback, gb_sets:singleton(Guid)); false -> ok end, {undefined, a(State)}; @@ -685,6 +692,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable, tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable, + async_callback = AsyncCallback, + sync_callback = SyncCallback, msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), @@ -696,7 +705,8 @@ tx_commit(Txn, Fun, MsgPropsFun, true -> ok = msg_store_sync( MSCState, true, PersistentGuids, msg_store_callback(PersistentGuids, Pubs, AckTags1, - Fun, MsgPropsFun)), + Fun, MsgPropsFun, + AsyncCallback, SyncCallback)), State; false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, Fun, MsgPropsFun, State) @@ -929,13 +939,13 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> + msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> rabbit_msg_store:client_init( MsgStore, Ref, MsgOnDiskFun, - msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). + msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE, Callback)). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_immutable_msg_store_state( @@ -967,16 +977,13 @@ msg_store_close_fds(MSCState, IsPersistent) -> MSCState, IsPersistent, fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). -msg_store_close_fds_fun(IsPersistent) -> - Self = self(), - fun () -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, - fun (State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = - msg_store_close_fds(MSCState, IsPersistent), - {[], State #vqstate { msg_store_clients = MSCState1 }} - end) +msg_store_close_fds_fun(IsPersistent, Callback) -> + fun () -> Callback( + fun (State = #vqstate { msg_store_clients = MSCState }) -> + {ok, MSCState1} = + msg_store_close_fds(MSCState, IsPersistent), + {[], State #vqstate { msg_store_clients = MSCState1 }} + end) end. maybe_write_delivered(false, _SeqId, IndexState) -> @@ -1062,7 +1069,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, Terms, - PersistentClient, TransientClient) -> + PersistentClient, TransientClient, AsyncCallback, SyncCallback) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1088,6 +1095,9 @@ init(IsDurable, IndexState, DeltaCount, Terms, durable = IsDurable, transient_threshold = NextSeqId, + async_callback = AsyncCallback, + sync_callback = SyncCallback, + len = DeltaCount1, persistent_count = DeltaCount1, @@ -1114,23 +1124,24 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> - Self = self(), - F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> {[], tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN)} - end) - end, - fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> remove_persistent_messages( - PersistentGuids) - end, F) +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun, + AsyncCallback, SyncCallback) -> + fun () -> spawn(fun () -> case SyncCallback( + fun (StateN) -> + tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) + end) of + ok -> ok; + error -> remove_persistent_messages( + PersistentGuids, AsyncCallback) + end end) end. -remove_persistent_messages(Guids) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), +remove_persistent_messages(Guids, AsyncCallback) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, + undefined, AsyncCallback), ok = rabbit_msg_store:remove(Guids, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). @@ -1442,35 +1453,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, msgs_confirmed(GuidSet, State) -> {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. -blind_confirm(QPid, GuidSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). - -msgs_written_to_disk(QPid, GuidSet, removed) -> - blind_confirm(QPid, GuidSet); -msgs_written_to_disk(QPid, GuidSet, written) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union( - MOD, gb_sets:intersection(UC, GuidSet)) }) - end). - -msg_indices_written_to_disk(QPid, GuidSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union( - MIOD, gb_sets:intersection(UC, GuidSet)) }) - end). +blind_confirm(Callback, GuidSet) -> + Callback(fun (State) -> msgs_confirmed(GuidSet, State) end). + +msgs_written_to_disk(Callback, GuidSet, removed) -> + blind_confirm(Callback, GuidSet); +msgs_written_to_disk(Callback, GuidSet, written) -> + Callback(fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union( + MOD, gb_sets:intersection(UC, GuidSet)) }) + end). + +msg_indices_written_to_disk(Callback, GuidSet) -> + Callback(fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union( + MIOD, gb_sets:intersection(UC, GuidSet)) }) + end). %%---------------------------------------------------------------------------- %% Phase changes |