summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl220
1 files changed, 110 insertions, 110 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 591e5a66..be6691e9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -156,7 +156,7 @@
%% segments.
%%
%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% Guid, MsgProps} (tuple-form) or as the message itself (message-
+%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
%% form). Acks for persistent messages are always stored in the tuple-
%% form. Acks for transient messages are also stored in tuple-form if
%% the message has been sent to disk as part of the memory reduction
@@ -261,7 +261,7 @@
-record(msg_status,
{ seq_id,
- guid,
+ msg_id,
msg,
is_persistent,
is_delivered,
@@ -400,10 +400,10 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids, ActionTaken) ->
- msgs_written_to_disk(Self, Guids, ActionTaken)
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Self, MsgIds, ActionTaken)
end,
- fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
+ fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
@@ -432,8 +432,8 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
rabbit_queue_index:recover(
QueueName, Terms1,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- fun (Guid) ->
- rabbit_msg_store:contains(Guid, PersistentClient)
+ fun (MsgId) ->
+ rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
init(true, IndexState, DeltaCount, Terms1,
@@ -509,16 +509,16 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, #basic_message { guid = Guid },
+publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
State = #vqstate { len = 0 }) ->
case NeedsConfirming of
- true -> blind_confirm(self(), gb_sets:singleton(Guid));
+ true -> blind_confirm(self(), gb_sets:singleton(MsgId));
false -> ok
end,
{undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- guid = Guid },
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
State = #vqstate { len = 0,
@@ -534,7 +534,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
@@ -585,12 +585,12 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
end.
read_msg(MsgStatus = #msg_status { msg = undefined,
- guid = Guid,
+ msg_id = MsgId,
is_persistent = IsPersistent },
State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
State #vqstate { ram_msg_count = RamMsgCount + 1,
msg_store_clients = MSCState1 }};
@@ -599,7 +599,7 @@ read_msg(MsgStatus, State) ->
internal_fetch(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
- guid = Guid,
+ msg_id = MsgId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -618,7 +618,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 2. Remove from msg_store and queue index, if necessary
Rem = fun () ->
- ok = msg_store_remove(MSCState, IsPersistent, [Guid])
+ ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
@@ -677,7 +677,8 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable,
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
ok = case IsDurable of
- true -> msg_store_remove(MSCState, true, persistent_guids(Pubs));
+ true -> msg_store_remove(MSCState, true,
+ persistent_msg_ids(Pubs));
false -> ok
end,
{lists:append(AckTags), a(State)}.
@@ -688,13 +689,13 @@ tx_commit(Txn, Fun, MsgPropsFun,
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
AckTags1 = lists:append(AckTags),
- PersistentGuids = persistent_guids(Pubs),
- HasPersistentPubs = PersistentGuids =/= [],
+ PersistentMsgIds = persistent_msg_ids(Pubs),
+ HasPersistentPubs = PersistentMsgIds =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
true -> ok = msg_store_sync(
- MSCState, true, PersistentGuids,
- msg_store_callback(PersistentGuids, Pubs, AckTags1,
+ MSCState, true, PersistentMsgIds,
+ msg_store_callback(PersistentMsgIds, Pubs, AckTags1,
Fun, MsgPropsFun)),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
@@ -712,10 +713,10 @@ requeue(AckTags, MsgPropsFun, State) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
+ ({IsPersistent, MsgId, MsgProps}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
+ msg_store_read(MSCState, IsPersistent, MsgId),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
{_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
true, true, State2),
@@ -904,12 +905,12 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a guid to the unconfirmed set
+%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
MsgProps) ->
- #msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
+ #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
@@ -936,30 +937,30 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun) ->
MsgStore, Ref, MsgOnDiskFun,
msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
-msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
+msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
-msg_store_read(MSCState, IsPersistent, Guid) ->
+msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
-msg_store_remove(MSCState, IsPersistent, Guids) ->
+msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end).
+ fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, Guids) ->
+msg_store_release(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end).
+ fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end).
-msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
+msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
+ fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
@@ -993,21 +994,21 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
-persistent_guids(Pubs) ->
- [Guid || {#basic_message { guid = Guid,
- is_persistent = true }, _MsgProps} <- Pubs].
+persistent_msg_ids(Pubs) ->
+ [MsgId || {#basic_message { id = MsgId,
+ is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
false -> {[m(#msg_status { msg = undefined,
- guid = Guid,
+ msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -1113,7 +1114,7 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
+msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> {[], tx_commit_post_msg_store(
@@ -1123,14 +1124,14 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
fun () -> remove_persistent_messages(
- PersistentGuids)
+ PersistentMsgIds)
end, F)
end)
end.
-remove_persistent_messages(Guids) ->
+remove_persistent_messages(MsgIds) ->
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
- ok = rabbit_msg_store:remove(Guids, PersistentClient),
+ ok = rabbit_msg_store:remove(MsgIds, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
@@ -1148,7 +1149,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
case dict:fetch(AckTag, PA) of
#msg_status {} ->
false;
- {IsPersistent, _Guid, _MsgProps} ->
+ {IsPersistent, _MsgId, _MsgProps} ->
IsPersistent
end];
false -> []
@@ -1214,38 +1215,38 @@ purge_betas_and_deltas(LensByStore,
end.
remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
- {GuidsByStore, Delivers, Acks} =
+ {MsgIdsByStore, Delivers, Acks} =
Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
- ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
- {sum_guids_by_store_to_len(LensByStore, GuidsByStore),
+ ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
+ msg_store_remove(MSCState, IsPersistent, MsgIds)
+ end, ok, MsgIdsByStore),
+ {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { guid = Guid, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {GuidsByStore, Delivers, Acks}) ->
+ {MsgIdsByStore, Delivers, Acks}) ->
{case MsgOnDisk of
- true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore);
- false -> GuidsByStore
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
end,
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
+sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
orddict:fold(
- fun (IsPersistent, Guids, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(Guids), LensByStore1)
- end, LensByStore, GuidsByStore).
+ fun (IsPersistent, MsgIds, LensByStore1) ->
+ orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
+ end, LensByStore, MsgIdsByStore).
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
+publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
@@ -1265,7 +1266,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
@@ -1277,14 +1278,14 @@ maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg, guid = Guid,
+ msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
+ ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
MsgStatus #msg_status { msg_on_disk = true };
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
@@ -1294,7 +1295,7 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid,
+ msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -1302,7 +1303,7 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:publish(
- Guid, SeqId, MsgProps, IsPersistent, IndexState),
+ MsgId, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1321,7 +1322,7 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId,
- guid = Guid,
+ msg_id = MsgId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
msg_props = MsgProps } = MsgStatus,
@@ -1330,8 +1331,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, Guid, MsgProps}, RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, MsgId, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1342,28 +1343,28 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore} =
+ {PersistentSeqIds, MsgIdsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
- true -> case orddict:find(false, GuidsByStore) of
- error -> State1;
- {ok, Guids} -> ok = msg_store_remove(MSCState, false,
- Guids),
+ true -> case orddict:find(false, MsgIdsByStore) of
+ error -> State1;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
+ MsgIds),
State1
end;
false -> IndexState1 =
rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = msg_store_remove(MSCState, IsPersistent, Guids)
- || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore},
+ {{PersistentSeqIds, MsgIdsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1379,10 +1380,10 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
gb_trees:delete_any(SeqId, RAI)})}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
- || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
- PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
- orddict:new(), GuidsByStore)),
+ [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
+ orddict:new(), MsgIdsByStore)),
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) }.
@@ -1392,12 +1393,12 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
- {PersistentSeqIdsAcc, GuidsByStore};
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore};
+accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
+ {PersistentSeqIdsAcc, MsgIdsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1416,12 +1417,12 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+remove_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
- msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
- unconfirmed = gb_sets:difference(UC, GuidSet) }.
+ State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
+ msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
+ unconfirmed = gb_sets:difference(UC, MsgIdSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1438,37 +1439,37 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(GuidSet, State) ->
- {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
+msgs_confirmed(MsgIdSet, State) ->
+ {gb_sets:to_list(MsgIdSet), remove_confirms(MsgIdSet, State)}.
-blind_confirm(QPid, GuidSet) ->
+blind_confirm(QPid, MsgIdSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+ QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
-msgs_written_to_disk(QPid, GuidSet, removed) ->
- blind_confirm(QPid, GuidSet);
-msgs_written_to_disk(QPid, GuidSet, written) ->
+msgs_written_to_disk(QPid, MsgIdSet, removed) ->
+ blind_confirm(QPid, MsgIdSet);
+msgs_written_to_disk(QPid, MsgIdSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ Written = gb_sets:intersection(UC, MsgIdSet),
+ msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
State #vqstate {
msgs_on_disk =
- gb_sets:union(
- MOD, gb_sets:intersection(UC, GuidSet)) })
+ gb_sets:union(MOD, Written) })
end).
-msg_indices_written_to_disk(QPid, GuidSet) ->
+msg_indices_written_to_disk(QPid, MsgIdSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ Written = gb_sets:intersection(UC, MsgIdSet),
+ msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
State #vqstate {
msg_indices_on_disk =
- gb_sets:union(
- MIOD, gb_sets:intersection(UC, GuidSet)) })
+ gb_sets:union(MIOD, Written) })
end).
%%----------------------------------------------------------------------------
@@ -1546,17 +1547,16 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
true ->
{Quota, State};
false ->
- {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI),
+ {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
MsgStatus = #msg_status {
- guid = Guid, %% ASSERTION
+ msg_id = MsgId, %% ASSERTION
is_persistent = false, %% ASSERTION
msg_props = MsgProps } = dict:fetch(SeqId, PA),
{_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA),
limit_ram_acks(Quota - 1,
- State1 #vqstate {
- pending_ack =
- dict:store(SeqId, {false, Guid, MsgProps}, PA),
- ram_ack_index = RAI1 })
+ State1 #vqstate { pending_ack = PA1,
+ ram_ack_index = RAI1 })
end.
@@ -1817,9 +1817,9 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
multiple_routing_keys() ->
transform_storage(
fun ({basic_message, ExchangeName, Routing_Key, Content,
- Guid, Persistent}) ->
+ MsgId, Persistent}) ->
{ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
+ MsgId, Persistent}};
(_) -> {error, corrupt_message}
end),
ok.