diff options
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 425 |
1 files changed, 231 insertions, 194 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7142d560..ff7252fd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,18 +16,18 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/5, terminate/1, delete_and_terminate/1, + purge/1, publish/3, publish_delivered/4, drain_confirmed/1, + fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1]). + status/1, multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/7]). %%---------------------------------------------------------------------------- %% Definitions: @@ -150,13 +150,16 @@ %% responsive. %% %% In the queue we keep track of both messages that are pending -%% delivery and messages that are pending acks. This ensures that -%% purging (deleting the former) and deletion (deleting the former and -%% the latter) are both cheap and do require any scanning through qi -%% segments. +%% delivery and messages that are pending acks. In the event of a +%% queue purge, we only need to load qi segments if the queue has +%% elements in deltas (i.e. it came under significant memory +%% pressure). In the event of a queue deletion, in addition to the +%% preceding, by keeping track of pending acks in RAM, we do not need +%% to search through qi segments looking for messages that are yet to +%% be acknowledged. %% %% Pending acks are recorded in memory either as the tuple {SeqId, -%% Guid, MsgProps} (tuple-form) or as the message itself (message- +%% MsgId, MsgProps} (tuple-form) or as the message itself (message- %% form). Acks for persistent messages are always stored in the tuple- %% form. Acks for transient messages are also stored in tuple-form if %% the message has been sent to disk as part of the memory reduction @@ -238,6 +241,9 @@ durable, transient_threshold, + async_callback, + sync_callback, + len, persistent_count, @@ -252,6 +258,7 @@ msgs_on_disk, msg_indices_on_disk, unconfirmed, + confirmed, ack_out_counter, ack_in_counter, ack_rates @@ -261,20 +268,20 @@ -record(msg_status, { seq_id, - guid, + msg_id, msg, is_persistent, is_delivered, msg_on_disk, index_on_disk, msg_props - }). + }). -record(delta, { start_seq_id, %% start_seq_id is inclusive count, end_seq_id %% end_seq_id is exclusive - }). + }). -record(tx, { pending_messages, pending_acks }). @@ -294,6 +301,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({multiple_routing_keys, local, []}). + -ifdef(use_specs). -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -330,11 +339,14 @@ {any(), binary()}}, on_sync :: sync(), durable :: boolean(), + transient_threshold :: non_neg_integer(), + + async_callback :: async_callback(), + sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), - transient_threshold :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), @@ -345,12 +357,15 @@ msgs_on_disk :: gb_set(), msg_indices_on_disk :: gb_set(), unconfirmed :: gb_set(), + confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). +-spec(multiple_routing_keys/0 :: () -> 'ok'). + -endif. -define(BLANK_DELTA, #delta { start_seq_id = undefined, @@ -393,25 +408,26 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - Self = self(), - init(QueueName, IsDurable, Recover, - fun (Guids, ActionTaken) -> - msgs_written_to_disk(Self, Guids, ActionTaken) +init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) -> + init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback, + fun (MsgIds, ActionTaken) -> + msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, - fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, + MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun); + MsgOnDiskFun, AsyncCallback); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(QueueName, true, true, AsyncCallback, SyncCallback, + MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -421,18 +437,18 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun), + MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef, - undefined), + undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (Guid) -> - rabbit_msg_store:contains(Guid, PersistentClient) + fun (MsgId) -> + rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, PersistentClient, TransientClient). terminate(State) -> @@ -505,12 +521,16 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, #basic_message { guid = Guid }, - _MsgProps, State = #vqstate { len = 0 }) -> - blind_confirm(self(), gb_sets:singleton(Guid)), +publish_delivered(false, #basic_message { id = MsgId }, + #message_properties { needs_confirming = NeedsConfirming }, + State = #vqstate { async_callback = Callback, len = 0 }) -> + case NeedsConfirming of + true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); + false -> ok + end, {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, - guid = Guid }, + id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0, @@ -526,7 +546,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, @@ -534,9 +554,12 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +drain_confirmed(State = #vqstate { confirmed = C }) -> + {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), - State1. + a(State1). dropwhile1(Pred, State) -> internal_queue_out( @@ -577,12 +600,12 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> end. read_msg(MsgStatus = #msg_status { msg = undefined, - guid = Guid, + msg_id = MsgId, is_persistent = IsPersistent }, State = #vqstate { ram_msg_count = RamMsgCount, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, MsgId), {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, msg_store_clients = MSCState1 }}; @@ -591,7 +614,7 @@ read_msg(MsgStatus, State) -> internal_fetch(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, - guid = Guid, + msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -610,7 +633,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { %% 2. Remove from msg_store and queue index, if necessary Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [Guid]) + ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = @@ -623,12 +646,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { %% 3. If an ack is required, add something sensible to PA {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, @@ -669,25 +692,31 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable, #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> msg_store_remove(MSCState, true, persistent_guids(Pubs)); + true -> msg_store_remove(MSCState, true, + persistent_msg_ids(Pubs)); false -> ok end, {lists:append(AckTags), a(State)}. tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable, + async_callback = AsyncCallback, + sync_callback = SyncCallback, msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(Pubs), - HasPersistentPubs = PersistentGuids =/= [], + PersistentMsgIds = persistent_msg_ids(Pubs), + HasPersistentPubs = PersistentMsgIds =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = msg_store_sync( - MSCState, true, PersistentGuids, - msg_store_callback(PersistentGuids, Pubs, AckTags1, - Fun, MsgPropsFun)), + true -> MsgStoreCallback = + fun () -> msg_store_callback( + PersistentMsgIds, Pubs, AckTags1, Fun, + MsgPropsFun, AsyncCallback, SyncCallback) + end, + ok = msg_store_sync(MSCState, true, PersistentMsgIds, + fun () -> spawn(MsgStoreCallback) end), State; false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, Fun, MsgPropsFun, State) @@ -699,15 +728,15 @@ requeue(AckTags, MsgPropsFun, State) -> needs_confirming = false } end, a(reduce_memory_use( - ack(fun msg_store_release/3, + ack(fun (_, _, _) -> ok end, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), true, false, State1), State2; - ({IsPersistent, Guid, MsgProps}, State1) -> + ({IsPersistent, MsgId, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, MsgId), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps), true, true, State2), @@ -768,8 +797,8 @@ ram_duration(State = #vqstate { RamAckCount = gb_trees:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso - AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of + case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -896,12 +925,12 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a guid to the unconfirmed set +%% when requeueing, we re-add a msg_id to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, +msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, MsgProps) -> - #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, + #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. @@ -920,38 +949,33 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> + msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), rabbit_msg_store:client_init( - MsgStore, Ref, MsgOnDiskFun, - msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). + MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end). -msg_store_write(MSCState, IsPersistent, Guid, Msg) -> +msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). + fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end). -msg_store_read(MSCState, IsPersistent, Guid) -> +msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end). + fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end). -msg_store_remove(MSCState, IsPersistent, Guids) -> +msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end). + fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_release(MSCState, IsPersistent, Guids) -> +msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end). - -msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). + fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end). msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( @@ -959,15 +983,9 @@ msg_store_close_fds(MSCState, IsPersistent) -> fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). msg_store_close_fds_fun(IsPersistent) -> - Self = self(), - fun () -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, - fun (State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = - msg_store_close_fds(MSCState, IsPersistent), - {[], State #vqstate { msg_store_clients = MSCState1 }} - end) + fun (State = #vqstate { msg_store_clients = MSCState }) -> + {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), + State #vqstate { msg_store_clients = MSCState1 } end. maybe_write_delivered(false, _SeqId, IndexState) -> @@ -985,21 +1003,21 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). -persistent_guids(Pubs) -> - [Guid || {#basic_message { guid = Guid, - is_persistent = true }, _MsgProps} <- Pubs]. +persistent_msg_ids(Pubs) -> + [MsgId || {#basic_message { id = MsgId, + is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered}, + fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; false -> {[m(#msg_status { msg = undefined, - guid = Guid, + msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -1053,7 +1071,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, Terms, - PersistentClient, TransientClient) -> + AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1079,6 +1097,9 @@ init(IsDurable, IndexState, DeltaCount, Terms, durable = IsDurable, transient_threshold = NextSeqId, + async_callback = AsyncCallback, + sync_callback = SyncCallback, + len = DeltaCount1, persistent_count = DeltaCount1, @@ -1093,6 +1114,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), + confirmed = gb_sets:new(), ack_out_counter = 0, ack_in_counter = 0, ack_rates = blank_rate(Now, 0) }, @@ -1105,24 +1127,20 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> - Self = self(), - F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> {[], tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN)} - end) - end, - fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> remove_persistent_messages( - PersistentGuids) - end, F) - end) +msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, + AsyncCallback, SyncCallback) -> + case SyncCallback(fun (StateN) -> + tx_commit_post_msg_store(true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) + end) of + ok -> ok; + error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback) end. -remove_persistent_messages(Guids) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), - ok = rabbit_msg_store:remove(Guids, PersistentClient), +remove_persistent_messages(MsgIds, AsyncCallback) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, + undefined, AsyncCallback), + ok = rabbit_msg_store:remove(MsgIds, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, @@ -1140,7 +1158,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, _Guid, _MsgProps} -> + {IsPersistent, _MsgId, _MsgProps} -> IsPersistent end]; false -> [] @@ -1206,38 +1224,38 @@ purge_betas_and_deltas(LensByStore, end. remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> - {GuidsByStore, Delivers, Acks} = + {MsgIdsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), - ok = orddict:fold(fun (IsPersistent, Guids, ok) -> - msg_store_remove(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), - {sum_guids_by_store_to_len(LensByStore, GuidsByStore), + ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> + msg_store_remove(MSCState, IsPersistent, MsgIds) + end, ok, MsgIdsByStore), + {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { guid = Guid, seq_id = SeqId, + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {GuidsByStore, Delivers, Acks}) -> + {MsgIdsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore); - false -> GuidsByStore + true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + false -> MsgIdsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> +sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> orddict:fold( - fun (IsPersistent, Guids, LensByStore1) -> - orddict:update_counter(IsPersistent, length(Guids), LensByStore1) - end, LensByStore, GuidsByStore). + fun (IsPersistent, MsgIds, LensByStore1) -> + orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) + end, LensByStore, MsgIdsByStore). %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, +publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, @@ -1257,7 +1275,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, @@ -1269,14 +1287,14 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, guid = Guid, + msg = Msg, msg_id = MsgId, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, - ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1), + ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), MsgStatus #msg_status { msg_on_disk = true }; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. @@ -1286,7 +1304,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, + msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -1294,7 +1312,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:publish( - Guid, SeqId, MsgProps, IsPersistent, IndexState), + MsgId, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1313,7 +1331,7 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %%---------------------------------------------------------------------------- record_pending_ack(#msg_status { seq_id = SeqId, - guid = Guid, + msg_id = MsgId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, msg_props = MsgProps } = MsgStatus, @@ -1322,8 +1340,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> {{IsPersistent, Guid, MsgProps}, RAI}; - false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + true -> {{IsPersistent, MsgId, MsgProps}, RAI}; + false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), State #vqstate { pending_ack = PA1, @@ -1334,28 +1352,28 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore} = + {PersistentSeqIds, MsgIdsByStore} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of - true -> case orddict:find(false, GuidsByStore) of - error -> State1; - {ok, Guids} -> ok = msg_store_remove(MSCState, false, - Guids), + true -> case orddict:find(false, MsgIdsByStore) of + error -> State1; + {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, + MsgIds), State1 end; false -> IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), - [ok = msg_store_remove(MSCState, IsPersistent, Guids) - || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], + [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } end. ack(_MsgStoreFun, _Fun, [], State) -> State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore}, + {{PersistentSeqIds, MsgIdsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1371,10 +1389,10 @@ ack(MsgStoreFun, Fun, AckTags, State) -> gb_trees:delete_any(SeqId, RAI)})} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), - [ok = MsgStoreFun(MSCState, IsPersistent, Guids) - || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], - PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( - orddict:new(), GuidsByStore)), + [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], + PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( + orddict:new(), MsgIdsByStore)), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) }. @@ -1384,12 +1402,12 @@ accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, - {PersistentSeqIdsAcc, GuidsByStore}) -> - {PersistentSeqIdsAcc, GuidsByStore}; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore}; +accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, + {PersistentSeqIdsAcc, MsgIdsByStore}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1408,12 +1426,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) -> false -> State end. -remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), - msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), - unconfirmed = gb_sets:difference(UC, GuidSet) }. +record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC, + confirmed = C }) -> + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet), + msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet), + unconfirmed = gb_sets:difference(UC, MsgIdSet), + confirmed = gb_sets:union (C, MsgIdSet) }. needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1430,38 +1450,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -msgs_confirmed(GuidSet, State) -> - {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. - -blind_confirm(QPid, GuidSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). - -msgs_written_to_disk(QPid, GuidSet, removed) -> - blind_confirm(QPid, GuidSet); -msgs_written_to_disk(QPid, GuidSet, written) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }) - end). - -msg_indices_written_to_disk(QPid, GuidSet) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }) - end). +blind_confirm(Callback, MsgIdSet) -> + Callback(fun (State) -> record_confirms(MsgIdSet, State) end). + +msgs_written_to_disk(Callback, MsgIdSet, removed) -> + blind_confirm(Callback, MsgIdSet); +msgs_written_to_disk(Callback, MsgIdSet, written) -> + Callback(fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + Confirmed = gb_sets:intersection(UC, MsgIdSet), + record_confirms(gb_sets:intersection(MsgIdSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union(MOD, Confirmed) }) + end). + +msg_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + Confirmed = gb_sets:intersection(UC, MsgIdSet), + record_confirms(gb_sets:intersection(MsgIdSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union(MIOD, Confirmed) }) + end). %%---------------------------------------------------------------------------- %% Phase changes @@ -1538,17 +1552,16 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, true -> {Quota, State}; false -> - {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), MsgStatus = #msg_status { - guid = Guid, %% ASSERTION + msg_id = MsgId, %% ASSERTION is_persistent = false, %% ASSERTION msg_props = MsgProps } = dict:fetch(SeqId, PA), {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA), limit_ram_acks(Quota - 1, - State1 #vqstate { - pending_ack = - dict:store(SeqId, {false, Guid, MsgProps}, PA), - ram_ack_index = RAI1 }) + State1 #vqstate { pending_ack = PA1, + ram_ack_index = RAI1 }) end. @@ -1801,3 +1814,27 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> push_betas_to_deltas( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. + +%%---------------------------------------------------------------------------- +%% Upgrading +%%---------------------------------------------------------------------------- + +multiple_routing_keys() -> + transform_storage( + fun ({basic_message, ExchangeName, Routing_Key, Content, + MsgId, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + MsgId, Persistent}}; + (_) -> {error, corrupt_message} + end), + ok. + + +%% Assumes message store is not running +transform_storage(TransformFun) -> + transform_store(?PERSISTENT_MSG_STORE, TransformFun), + transform_store(?TRANSIENT_MSG_STORE, TransformFun). + +transform_store(Store, TransformFun) -> + rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), + rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). |