summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-23 08:20:08 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-23 08:20:08 +0100
commite5e4c8942efecc455e2a72c5435bc6bc535a742e (patch)
tree64bd2d6fc20e73ed47ecb48f918b24ea3fb76df4
parent2816ffa027a523a9ff2de052cb834a706e8068c5 (diff)
downloadrabbitmq-server-e5e4c8942efecc455e2a72c5435bc6bc535a742e.tar.gz
cosmetics and minor refactoring on code from recently merged bug23111
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl51
-rw-r--r--src/rabbit_invariable_queue.erl10
-rw-r--r--src/rabbit_queue_index.erl36
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_variable_queue.erl112
6 files changed, 95 insertions, 122 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index bc2b5ac9..9d78bafa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -311,7 +311,7 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_expires_argument/1},
+ [{<<"x-expires">>, fun check_expires_argument/1},
{<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
ok.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9b70a8f..6048920e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -150,22 +150,6 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-init_queue_state(State) ->
- lists:foldl(fun(F, S) -> F(S) end, State,
- [fun init_expires/1, fun init_ttl/1]).
-
-init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
- undefined -> State
- end.
-
-init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of
- {_Type, TTL} -> drop_expired_messages(State#q{ttl = TTL});
- undefined -> State
- end.
-
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined,
@@ -180,8 +164,7 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- State1 = init_queue_state(
- State#q{backing_queue_state = BQS}),
+ State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(StatsTimer,
@@ -190,6 +173,19 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ lists:foldl(fun({Arg, Fun}, State1) ->
+ case rabbit_misc:table_lookup(Arguments, Arg) of
+ {_Type, Val} -> Fun(Val, State1);
+ undefined -> State1
+ end
+ end, State, [{<<"x-expires">>, fun init_expires/2},
+ {<<"x-message-ttl">>, fun init_ttl/2}]).
+
+init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
+
+init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -588,10 +584,8 @@ reset_msg_expiry_fun(TTL) ->
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
-calculate_msg_expiry(undefined) ->
- undefined;
-calculate_msg_expiry(TTL) ->
- now_millis() + (TTL * 1000).
+calculate_msg_expiry(undefined) -> undefined;
+calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
@@ -610,18 +604,15 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ttl_timer_ref = undefined})
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
- true ->
- State;
- false ->
- State#q{ttl_timer_ref =
- timer:apply_after(TTL, rabbit_amqqueue,
- drop_expired, [self()])}
+ true -> State;
+ false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired,
+ [self()]),
+ State#q{ttl_timer_ref = TRef}
end;
ensure_ttl_timer(State) ->
State.
-now_millis() ->
- timer:now_diff(now(), {0,0,0}).
+now_millis() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index ad01d8f7..9855f18c 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -122,12 +122,10 @@ dropwhile(_Pred, State = #iv_state { len = 0 }) ->
dropwhile(Pred, State = #iv_state { queue = Q }) ->
{{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
case Pred(MsgProps) of
- true ->
- {_, State1} =
- fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State),
- dropwhile(Pred, State1);
- false ->
- State
+ true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps,
+ IsDelivered, State),
+ dropwhile(Pred, State1);
+ false -> State
end.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f84dff83..1b837128 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -98,7 +98,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}),
+%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -162,7 +162,7 @@
%% ---- misc ----
--define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent}
+-define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
@@ -259,8 +259,7 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProperties, IsPersistent, State)
- when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
@@ -269,9 +268,9 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State)
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProperties)]),
+ create_pub_record_body(Guid, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)).
+ add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -464,9 +463,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq,
- {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack},
- Segment1) ->
+ fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
end,
@@ -519,8 +516,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq,
- {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
gatherer:in(Gatherer, {Guid, 1});
(_RelSeq, _Value, Acc) ->
@@ -678,8 +674,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case read_pub_record_body(Hdl) of
- {Guid, MsgProperties} ->
- Publish = {Guid, MsgProperties,
+ {Guid, MsgProps} ->
+ Publish = {Guid, MsgProps,
case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
@@ -781,12 +777,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, MsgProperties, IsPersistent} ->
+ {Guid, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProperties)])
+ create_pub_record_body(Guid, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -806,12 +802,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq,
- {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack},
- Acc)
+ fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties,
+ [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -841,8 +835,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {Guid, MsgProperties} = read_pub_record_body(Hdl),
- Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack},
+ {Guid, MsgProps} = read_pub_record_body(Hdl),
+ Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index d625bda1..91f2c4ca 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -37,13 +37,13 @@
-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
- unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
- amqp_error/0, r/1, r2/2, r3/3, listener/0,
+ unencoded_content/0, encoded_content/0, message_properties/0,
+ vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1,
ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
- connection_exit/0, message_properties/0]).
+ connection_exit/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0b948c1b..1395c77f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -249,7 +249,7 @@
is_delivered,
msg_on_disk,
index_on_disk,
- msg_properties
+ msg_props
}).
-record(delta,
@@ -499,8 +499,8 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProperties, State) ->
- {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State),
+publish(Msg, MsgProps, State) ->
+ {_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
@@ -532,7 +532,7 @@ dropwhile(Pred, State) ->
dropwhile1(Pred, State) ->
internal_queue_out(
- fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) ->
+ fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
case Pred(MsgProps) of
true ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
@@ -634,14 +634,13 @@ ack(AckTags, State) ->
fun (_AckEntry, State1) -> State1 end,
AckTags, State)).
-tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
- MsgProperties,
+tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties),
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(false, MsgStatus, MSCState),
State #vqstate { msg_store_clients = MSCState1 };
@@ -683,18 +682,16 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg,
- msg_properties = MsgProperties }, State1) ->
- {_SeqId, State2} =
- publish(Msg, MsgPropsFun(MsgProperties), true,
- false, State1),
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
State2;
- ({IsPersistent, Guid, MsgProperties}, State1) ->
+ ({IsPersistent, Guid, MsgProps}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
true, true, State2),
State3
end,
@@ -844,11 +841,11 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
- MsgProperties) ->
+ MsgProps) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false,
- msg_properties = MsgProperties }.
+ msg_props = MsgProps }.
find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
@@ -883,26 +880,26 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
- [Guid || {#basic_message { guid = Guid, is_persistent = true }, _MsgProps}
- <- Pubs].
+ [Guid || {#basic_message { guid = Guid,
+ is_persistent = true }, _MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, MsgProperties, IsPersistent, IsDelivered},
+ fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> {[m(#msg_status { msg = undefined,
- guid = Guid,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_properties = MsgProperties
+ false -> {[m(#msg_status { msg = undefined,
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
}) | Filtered1],
Delivers1,
Acks1}
@@ -978,9 +975,10 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
case IsDurable of
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
- #msg_status {} -> false;
- {IsPersistent,
- _Guid, _MsgProps} -> IsPersistent
+ #msg_status {} ->
+ false;
+ {IsPersistent, _Guid, _MsgProps} ->
+ IsPersistent
end];
false -> []
end,
@@ -1011,21 +1009,19 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- Pubs = lists:foldl(
- fun({Fun, PubsN}, OuterAcc) ->
- lists:foldl(
- fun({Msg, MsgProps}, Acc) ->
- [{Msg, Fun(MsgProps)} | Acc]
- end, OuterAcc, PubsN)
- end, [], SPubs),
+ Pubs = lists:foldl(fun({Fun, PubsN}, OuterAcc) ->
+ lists:foldl(fun({Msg, MsgProps}, Acc) ->
+ [{Msg, Fun(MsgProps)} | Acc]
+ end, OuterAcc, PubsN)
+ end, [], SPubs),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
- MsgProperties},
+ MsgProps},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
{SeqId, State3} =
- publish(Msg, MsgProperties, false, IsPersistent1, State2),
+ publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
@@ -1082,7 +1078,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
%%----------------------------------------------------------------------------
publish(Msg = #basic_message { is_persistent = IsPersistent },
- MsgProperties, IsDelivered, MsgOnDisk,
+ MsgProps, IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
@@ -1091,7 +1087,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
+ MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
@@ -1131,19 +1127,15 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_properties = MsgProperties},
- IndexState)
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid,
- SeqId,
- MsgProperties,
- IsPersistent,
- IndexState),
+ IndexState1 = rabbit_queue_index:publish(
+ Guid, SeqId, MsgProps, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
@@ -1163,13 +1155,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
+record_pending_ack(#msg_status { seq_id = SeqId,
+ guid = Guid,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
- msg_properties = MsgProperties } = MsgStatus,
- PA) ->
+ msg_on_disk = MsgOnDisk,
+ msg_props = MsgProps } = MsgStatus, PA) ->
AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid, MsgProperties};
+ true -> {IsPersistent, Guid, MsgProps};
false -> MsgStatus
end,
dict:store(SeqId, AckEntry, PA).
@@ -1220,9 +1212,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId,
- {IsPersistent, Guid, _MsgProperties},
- {SeqIdsAcc, Dict}) ->
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.