summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl425
1 files changed, 231 insertions, 194 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7142d560..ff7252fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,18 +16,18 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/5, terminate/1, delete_and_terminate/1,
+ purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/7]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -150,13 +150,16 @@
%% responsive.
%%
%% In the queue we keep track of both messages that are pending
-%% delivery and messages that are pending acks. This ensures that
-%% purging (deleting the former) and deletion (deleting the former and
-%% the latter) are both cheap and do require any scanning through qi
-%% segments.
+%% delivery and messages that are pending acks. In the event of a
+%% queue purge, we only need to load qi segments if the queue has
+%% elements in deltas (i.e. it came under significant memory
+%% pressure). In the event of a queue deletion, in addition to the
+%% preceding, by keeping track of pending acks in RAM, we do not need
+%% to search through qi segments looking for messages that are yet to
+%% be acknowledged.
%%
%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% Guid, MsgProps} (tuple-form) or as the message itself (message-
+%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
%% form). Acks for persistent messages are always stored in the tuple-
%% form. Acks for transient messages are also stored in tuple-form if
%% the message has been sent to disk as part of the memory reduction
@@ -238,6 +241,9 @@
durable,
transient_threshold,
+ async_callback,
+ sync_callback,
+
len,
persistent_count,
@@ -252,6 +258,7 @@
msgs_on_disk,
msg_indices_on_disk,
unconfirmed,
+ confirmed,
ack_out_counter,
ack_in_counter,
ack_rates
@@ -261,20 +268,20 @@
-record(msg_status,
{ seq_id,
- guid,
+ msg_id,
msg,
is_persistent,
is_delivered,
msg_on_disk,
index_on_disk,
msg_props
- }).
+ }).
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
end_seq_id %% end_seq_id is exclusive
- }).
+ }).
-record(tx, { pending_messages, pending_acks }).
@@ -294,6 +301,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, local, []}).
+
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -330,11 +339,14 @@
{any(), binary()}},
on_sync :: sync(),
durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+
+ async_callback :: async_callback(),
+ sync_callback :: sync_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
- transient_threshold :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
@@ -345,12 +357,15 @@
msgs_on_disk :: gb_set(),
msg_indices_on_disk :: gb_set(),
unconfirmed :: gb_set(),
+ confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl").
+-spec(multiple_routing_keys/0 :: () -> 'ok').
+
-endif.
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
@@ -393,25 +408,26 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
- Self = self(),
- init(QueueName, IsDurable, Recover,
- fun (Guids, ActionTaken) ->
- msgs_written_to_disk(Self, Guids, ActionTaken)
+init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) ->
+ init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
end,
- fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
+ fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, IsDurable, false, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback,
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun);
+ MsgOnDiskFun, AsyncCallback);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(QueueName, true, true, AsyncCallback, SyncCallback,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -421,18 +437,18 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun),
+ MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
- undefined),
+ undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- fun (Guid) ->
- rabbit_msg_store:contains(Guid, PersistentClient)
+ fun (MsgId) ->
+ rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
terminate(State) ->
@@ -505,12 +521,16 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, #basic_message { guid = Guid },
- _MsgProps, State = #vqstate { len = 0 }) ->
- blind_confirm(self(), gb_sets:singleton(Guid)),
+publish_delivered(false, #basic_message { id = MsgId },
+ #message_properties { needs_confirming = NeedsConfirming },
+ State = #vqstate { async_callback = Callback, len = 0 }) ->
+ case NeedsConfirming of
+ true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
+ false -> ok
+ end,
{undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- guid = Guid },
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
State = #vqstate { len = 0,
@@ -526,7 +546,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
@@ -534,9 +554,12 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
- State1.
+ a(State1).
dropwhile1(Pred, State) ->
internal_queue_out(
@@ -577,12 +600,12 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
end.
read_msg(MsgStatus = #msg_status { msg = undefined,
- guid = Guid,
+ msg_id = MsgId,
is_persistent = IsPersistent },
State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
State #vqstate { ram_msg_count = RamMsgCount + 1,
msg_store_clients = MSCState1 }};
@@ -591,7 +614,7 @@ read_msg(MsgStatus, State) ->
internal_fetch(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
- guid = Guid,
+ msg_id = MsgId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -610,7 +633,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 2. Remove from msg_store and queue index, if necessary
Rem = fun () ->
- ok = msg_store_remove(MSCState, IsPersistent, [Guid])
+ ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
@@ -623,12 +646,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -669,25 +692,31 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
ok = case IsDurable of
- true -> msg_store_remove(MSCState, true, persistent_guids(Pubs));
+ true -> msg_store_remove(MSCState, true,
+ persistent_msg_ids(Pubs));
false -> ok
end,
{lists:append(AckTags), a(State)}.
tx_commit(Txn, Fun, MsgPropsFun,
State = #vqstate { durable = IsDurable,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
msg_store_clients = MSCState }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
AckTags1 = lists:append(AckTags),
- PersistentGuids = persistent_guids(Pubs),
- HasPersistentPubs = PersistentGuids =/= [],
+ PersistentMsgIds = persistent_msg_ids(Pubs),
+ HasPersistentPubs = PersistentMsgIds =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = msg_store_sync(
- MSCState, true, PersistentGuids,
- msg_store_callback(PersistentGuids, Pubs, AckTags1,
- Fun, MsgPropsFun)),
+ true -> MsgStoreCallback =
+ fun () -> msg_store_callback(
+ PersistentMsgIds, Pubs, AckTags1, Fun,
+ MsgPropsFun, AsyncCallback, SyncCallback)
+ end,
+ ok = msg_store_sync(MSCState, true, PersistentMsgIds,
+ fun () -> spawn(MsgStoreCallback) end),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
@@ -699,15 +728,15 @@ requeue(AckTags, MsgPropsFun, State) ->
needs_confirming = false }
end,
a(reduce_memory_use(
- ack(fun msg_store_release/3,
+ ack(fun (_, _, _) -> ok end,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
+ ({IsPersistent, MsgId, MsgProps}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, MsgId),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
{_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
true, true, State2),
@@ -768,8 +797,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
+ case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -896,12 +925,12 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a guid to the unconfirmed set
+%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
MsgProps) ->
- #msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
+ #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
@@ -920,38 +949,33 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+ msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun,
- msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
+ MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end).
-msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
+msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
-msg_store_read(MSCState, IsPersistent, Guid) ->
+msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
-msg_store_remove(MSCState, IsPersistent, Guids) ->
+msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end).
+ fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, Guids) ->
+msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end).
-
-msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
@@ -959,15 +983,9 @@ msg_store_close_fds(MSCState, IsPersistent) ->
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
msg_store_close_fds_fun(IsPersistent) ->
- Self = self(),
- fun () ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} =
- msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
- end)
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
end.
maybe_write_delivered(false, _SeqId, IndexState) ->
@@ -985,21 +1003,21 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
-persistent_guids(Pubs) ->
- [Guid || {#basic_message { guid = Guid,
- is_persistent = true }, _MsgProps} <- Pubs].
+persistent_msg_ids(Pubs) ->
+ [MsgId || {#basic_message { id = MsgId,
+ is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
false -> {[m(#msg_status { msg = undefined,
- guid = Guid,
+ msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -1053,7 +1071,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient) ->
+ AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1079,6 +1097,9 @@ init(IsDurable, IndexState, DeltaCount, Terms,
durable = IsDurable,
transient_threshold = NextSeqId,
+ async_callback = AsyncCallback,
+ sync_callback = SyncCallback,
+
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1093,6 +1114,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
ack_out_counter = 0,
ack_in_counter = 0,
ack_rates = blank_rate(Now, 0) },
@@ -1105,24 +1127,20 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
- Self = self(),
- F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
- end,
- fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () -> remove_persistent_messages(
- PersistentGuids)
- end, F)
- end)
+msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun,
+ AsyncCallback, SyncCallback) ->
+ case SyncCallback(fun (StateN) ->
+ tx_commit_post_msg_store(true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
+ end) of
+ ok -> ok;
+ error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback)
end.
-remove_persistent_messages(Guids) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
- ok = rabbit_msg_store:remove(Guids, PersistentClient),
+remove_persistent_messages(MsgIds, AsyncCallback) ->
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE,
+ undefined, AsyncCallback),
+ ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
@@ -1140,7 +1158,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
case dict:fetch(AckTag, PA) of
#msg_status {} ->
false;
- {IsPersistent, _Guid, _MsgProps} ->
+ {IsPersistent, _MsgId, _MsgProps} ->
IsPersistent
end];
false -> []
@@ -1206,38 +1224,38 @@ purge_betas_and_deltas(LensByStore,
end.
remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
- {GuidsByStore, Delivers, Acks} =
+ {MsgIdsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
- ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
- {sum_guids_by_store_to_len(LensByStore, GuidsByStore),
+ ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
+ msg_store_remove(MSCState, IsPersistent, MsgIds)
+ end, ok, MsgIdsByStore),
+ {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { guid = Guid, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {GuidsByStore, Delivers, Acks}) ->
+ {MsgIdsByStore, Delivers, Acks}) ->
{case MsgOnDisk of
- true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore);
- false -> GuidsByStore
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
+sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
orddict:fold(
- fun (IsPersistent, Guids, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(Guids), LensByStore1)
- end, LensByStore, GuidsByStore).
+ fun (IsPersistent, MsgIds, LensByStore1) ->
+ orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
+ end, LensByStore, MsgIdsByStore).
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
+publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
@@ -1257,7 +1275,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
@@ -1269,14 +1287,14 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg, guid = Guid,
+ msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
+ ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
MsgStatus #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1286,7 +1304,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid,
+ msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -1294,7 +1312,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:publish(
- Guid, SeqId, MsgProps, IsPersistent, IndexState),
+ MsgId, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1313,7 +1331,7 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId,
- guid = Guid,
+ msg_id = MsgId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
msg_props = MsgProps } = MsgStatus,
@@ -1322,8 +1340,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, Guid, MsgProps}, RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, MsgId, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1334,28 +1352,28 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore} =
+ {PersistentSeqIds, MsgIdsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
- true -> case orddict:find(false, GuidsByStore) of
- error -> State1;
- {ok, Guids} -> ok = msg_store_remove(MSCState, false,
- Guids),
+ true -> case orddict:find(false, MsgIdsByStore) of
+ error -> State1;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
+ MsgIds),
State1
end;
false -> IndexState1 =
rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = msg_store_remove(MSCState, IsPersistent, Guids)
- || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore},
+ {{PersistentSeqIds, MsgIdsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1371,10 +1389,10 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
gb_trees:delete_any(SeqId, RAI)})}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
- || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
- PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
- orddict:new(), GuidsByStore)),
+ [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
+ orddict:new(), MsgIdsByStore)),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) }.
@@ -1384,12 +1402,12 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
- {PersistentSeqIdsAcc, GuidsByStore};
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore};
+accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1408,12 +1426,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
- msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
- unconfirmed = gb_sets:difference(UC, GuidSet) }.
+record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC,
+ confirmed = C }) ->
+ State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
+ msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
+ unconfirmed = gb_sets:difference(UC, MsgIdSet),
+ confirmed = gb_sets:union (C, MsgIdSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1430,38 +1450,32 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(GuidSet, State) ->
- {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-
-blind_confirm(QPid, GuidSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
-
-msgs_written_to_disk(QPid, GuidSet, removed) ->
- blind_confirm(QPid, GuidSet);
-msgs_written_to_disk(QPid, GuidSet, written) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
- end).
-
-msg_indices_written_to_disk(QPid, GuidSet) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
- end).
+blind_confirm(Callback, MsgIdSet) ->
+ Callback(fun (State) -> record_confirms(MsgIdSet, State) end).
+
+msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+ blind_confirm(Callback, MsgIdSet);
+msgs_written_to_disk(Callback, MsgIdSet, written) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(MOD, Confirmed) })
+ end).
+
+msg_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(MIOD, Confirmed) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes
@@ -1538,17 +1552,16 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
true ->
{Quota, State};
false ->
- {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
+ {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status {
- guid = Guid, %% ASSERTION
+ msg_id = MsgId, %% ASSERTION
is_persistent = false, %% ASSERTION
msg_props = MsgProps } = dict:fetch(SeqId, PA),
{_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA),
limit_ram_acks(Quota - 1,
- State1 #vqstate {
- pending_ack =
- dict:store(SeqId, {false, Guid, MsgProps}, PA),
- ram_ack_index = RAI1 })
+ State1 #vqstate { pending_ack = PA1,
+ ram_ack_index = RAI1 })
end.
@@ -1801,3 +1814,27 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ MsgId, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ MsgId, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
+
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).