summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-06 15:29:53 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-06 15:29:53 +0100
commita692dd448ef31fd8f58160a1ba8cbe743e16d2ed (patch)
treec60f15e265bedb532557be235f994559af67c974
parent3f050dad0471e3bb71123f45290f3da7ce751f9d (diff)
downloadrabbitmq-server-a692dd448ef31fd8f58160a1ba8cbe743e16d2ed.tar.gz
msg_properties -> message_properties in order to be consistent with message and basic_message (though within vq, we have plenty of msg_-prefixes so don't bother inside the msg_status record in there). Also, tidied up a lot of trailing whitespace
-rw-r--r--include/rabbit.hrl2
-rw-r--r--include/rabbit_backing_queue_spec.hrl29
-rw-r--r--src/rabbit_amqqueue_process.erl66
-rw-r--r--src/rabbit_persister.erl3
-rw-r--r--src/rabbit_queue_index.erl33
-rw-r--r--src/rabbit_tests.erl22
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_variable_queue.erl76
8 files changed, 120 insertions, 117 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0f0a0e87..f924878d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -74,7 +74,7 @@
-record(event, {type, props, timestamp}).
--record(msg_properties, {expiry}).
+-record(message_properties, {expiry}).
%%----------------------------------------------------------------------------
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 12d32363..dcff5b37 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -34,13 +34,14 @@
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
-type(peek_result() :: ('empty'|{rabbit_types:basic_message(),
- rabbit_types:msg_properties()})).
+ rabbit_types:message_properties()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(ack_required() :: boolean()).
--type(msg_properties_transformer() ::
- fun ((rabbit_types:msg_properties()) -> rabbit_types:msg_properties())).
+-type(message_properties_transformer() ::
+ fun ((rabbit_types:message_properties())
+ -> rabbit_types:message_properties())).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -49,26 +50,26 @@
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/3 ::
- (rabbit_types:basic_message(), rabbit_types:msg_properties(), state())
- -> state()).
+-spec(publish/3 :: (rabbit_types:basic_message(),
+ rabbit_types:message_properties_properties(), state())
+ -> state()).
-spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(),
- rabbit_types:msg_properties(), state())
- -> {ack(), state()}).
+ rabbit_types:message_properties(), state())
+ -> {ack(), state()}).
-spec(dropwhile/2 ::
- (fun ((rabbit_types:msg_properties()) -> boolean()), state())
+ (fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/4 ::
- (rabbit_types:txn(), rabbit_types:basic_message(),
- rabbit_types:msg_properties(), state()) -> state()).
+-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
-spec(tx_commit/4 ::
(rabbit_types:txn(), fun (() -> any()),
- msg_properties_transformer(), state()) -> {[ack()], state()}).
--spec(requeue/3 :: ([ack()], msg_properties_transformer(), state()) -> state()).
+ message_properties_transformer(), state()) -> {[ack()], state()}).
+-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
+ -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 53b98490..91d3f586 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,7 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}).
+-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}).
-export([start_link/1, info_keys/0]).
@@ -152,7 +152,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
init_queue_state(State) ->
- lists:foldl(fun(F, S) -> F(S) end, State,
+ lists:foldl(fun(F, S) -> F(S) end, State,
[fun init_expires/1, fun init_ttl/1]).
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
@@ -413,7 +413,7 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
@@ -424,17 +424,18 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message,
- #msg_properties{}, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ #message_properties{}, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}.
+ {true,
+ State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, #message_properties{}, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -442,25 +443,25 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message,
- msg_properties(State),
+ BQS = BQ:publish(Message,
+ message_properties(State),
State #q.backing_queue_state),
{false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) ->
+ fun (BQS) ->
BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS)
end, State).
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} ->
+ {empty, BQS1} ->
{empty, State#q{backing_queue_state = BQS1}};
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- {{Message, IsDelivered, AckTag, Remaining},
+ {{Message, IsDelivered, AckTag, Remaining},
State#q{backing_queue_state = BQS1}}
end.
@@ -559,9 +560,9 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(State),
+ {AckTags, BQS1} = BQ:tx_commit(Txn,
+ fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(State),
BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
@@ -583,38 +584,38 @@ subtract_acks(A, B) when is_list(B) ->
reset_msg_expiry_fun(State) ->
fun(MsgProps) ->
- MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ MsgProps#message_properties{expiry = calculate_msg_expiry(State)}
end.
-msg_properties(State) ->
- #msg_properties{expiry = calculate_msg_expiry(State)}.
+message_properties(State) ->
+ #message_properties{expiry = calculate_msg_expiry(State)}.
calculate_msg_expiry(_State = #q{ttl = undefined}) ->
undefined;
calculate_msg_expiry(_State = #q{ttl = TTL}) ->
- now_millis() + (TTL * 1000).
+ now_millis() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
-drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+drop_expired_messages(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
Now = now_millis(),
BQS1 = BQ:dropwhile(
- fun (_MsgProperties = #msg_properties{expiry=Expiry}) ->
+ fun (_MsgProperties = #message_properties{expiry = Expiry}) ->
Now > Expiry
end, BQS),
ensure_ttl_timer(State #q{backing_queue_state = BQS1}).
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
+ensure_ttl_timer(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL,
+ ttl_timer_ref = undefined})
when TTL =/= undefined->
case BQ:is_empty(BQS) of
true ->
State;
false ->
- State#q{ttl_timer_ref =
+ State#q{ttl_timer_ref =
timer:send_after(TTL, self(), drop_expired)}
end;
ensure_ttl_timer(State) ->
@@ -622,8 +623,7 @@ ensure_ttl_timer(State) ->
now_millis() ->
timer:now_diff(now(), {0,0,0}).
-
-
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -752,7 +752,7 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%%
%% we don't need an expiry here because messages are not being
- %% enqueued, so we use an empty msg_properties.
+ %% enqueued, so we use an empty message_properties.
{Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
@@ -781,7 +781,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, drop_expired_messages(State1)) of
- {empty, State2} ->
+ {empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 513b14df..11056c8e 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -69,7 +69,8 @@
-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
-type(work_item() ::
- {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} |
+ {publish,
+ rabbit_types:message(), rabbit_types:message_properties(), pmsg()} |
{deliver, pmsg()} |
{ack, pmsg()}).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6568aa70..c5a3da53 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -141,7 +141,7 @@
-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
+%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
%% of md5sum msg id
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
@@ -205,15 +205,16 @@
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_guid:guid(), seq_id(), rabbit_types:msg_properties(),
- boolean(), qistate()) -> qistate()).
+-spec(publish/5 :: (rabbit_guid:guid(), seq_id(),
+ rabbit_types:message_properties(), boolean(), qistate())
+ -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(),
- rabbit_types:msg_properties(),
+ {[{rabbit_guid:guid(), seq_id(),
+ rabbit_types:message_properties(),
boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
@@ -258,7 +259,7 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProperties, IsPersistent, State)
+publish(Guid, SeqId, MsgProperties, IsPersistent, State)
when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
@@ -266,7 +267,7 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State)
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS,
+ end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
create_pub_record_body(Guid, MsgProperties)]),
maybe_flush_journal(
@@ -463,8 +464,8 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq,
- {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack},
+ fun (RelSeq,
+ {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack},
Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
@@ -518,8 +519,8 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq,
- {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq,
+ {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
gatherer:in(Gatherer, {Guid, 1});
(_RelSeq, _Value, Acc) ->
@@ -533,7 +534,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) ->
+create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
[Guid, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) ->
@@ -552,11 +553,11 @@ read_pub_record_body(Hdl) ->
?NO_EXPIRY -> undefined;
X -> X
end,
- {Guid, #msg_properties{expiry = Exp}};
+ {Guid, #message_properties{expiry = Exp}};
Error ->
Error
end.
-
+
%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
@@ -806,8 +807,8 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq,
- {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack},
+ fun (RelSeq,
+ {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack},
Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d2489685..638a45e1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1639,7 +1639,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
- Guid, SeqId, #msg_properties{}, Persistent, QiN),
+ Guid, SeqId, #message_properties{}, Persistent, QiN),
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
@@ -1661,9 +1661,9 @@ test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
Guid = rabbit_guid:guid(),
- Props = #msg_properties{expiry=12345},
+ Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
- {[{Guid, 1, Props, _, _}], Qi2} =
+ {[{Guid, 1, Props, _, _}], Qi2} =
rabbit_queue_index:read(1, 2, Qi1),
Qi2
end),
@@ -1802,12 +1802,12 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
fun (_N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
+ rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
- #msg_properties{}, VQN)
+ end}, <<>>),
+ #message_properties{}, VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1853,14 +1853,14 @@ test_dropwhile(VQ0) ->
fun (N, VQN) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{}, <<>>),
- #msg_properties{expiry = N}, VQN)
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{}, <<>>),
+ #message_properties{expiry = N}, VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
VQ2 = rabbit_variable_queue:dropwhile(
- fun(#msg_properties { expiry = Expiry }) ->
+ fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
end, VQ1),
@@ -1875,7 +1875,7 @@ test_dropwhile(VQ0) ->
{empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
VQ4.
-
+
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 1db23883..7671267c 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,7 +42,7 @@
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
ok_pid_or_error/0, channel_exit/0, connection_exit/0,
- msg_properties/0]).
+ message_properties/0]).
-type(channel_exit() :: no_return()).
-type(connection_exit() :: no_return()).
@@ -87,8 +87,8 @@
txn :: maybe(txn()),
sender :: pid(),
message :: message()}).
--type(msg_properties() ::
- #msg_properties{expiry :: pos_integer() | 'undefined'}).
+-type(message_properties() ::
+ #message_properties{expiry :: pos_integer() | 'undefined'}).
%% this is really an abstract type, but dialyzer does not support them
-type(txn() :: rabbit_guid:guid()).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 78958717..72fa4aeb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -524,22 +524,22 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
in_counter = InCount + 1,
persistent_count = PCount1,
pending_ack = PA1 })}.
-
+
dropwhile(Pred, State) ->
case internal_queue_out(
fun(MsgStatus = #msg_status { msg_properties = MsgProps },
State1) ->
case Pred(MsgProps) of
true ->
- {_, State2} = internal_fetch(false,
+ {_, State2} = internal_fetch(false,
MsgStatus, State1),
- dropwhile(Pred, State2);
+ dropwhile(Pred, State2);
false ->
%% message needs to go back into Q4 (or
%% maybe go in for the first time if it was
%% loaded from Q3). Also the msg contents
%% might not be in RAM, so read them in now
- {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
+ {MsgStatus1, State2 = #vqstate { q4 = Q4 }} =
read_msg(MsgStatus, State1),
State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)}
end
@@ -550,11 +550,11 @@ dropwhile(Pred, State) ->
fetch(AckRequired, State) ->
internal_queue_out(
- fun(MsgStatus, State1) ->
+ fun(MsgStatus, State1) ->
%% it's possible that the message wasn't read from disk
%% at this point, so read it in.
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
- internal_fetch(AckRequired, MsgStatus1, State2)
+ internal_fetch(AckRequired, MsgStatus1, State2)
end, State).
internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
@@ -568,20 +568,20 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
-read_msg(MsgStatus = #msg_status { msg = undefined,
- guid = Guid,
+read_msg(MsgStatus = #msg_status { msg = undefined,
+ guid = Guid,
index_on_disk = IndexOnDisk,
- is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
+ is_persistent = IsPersistent },
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
msg_store_clients = MSCState}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
+ {{ok, Msg = #basic_message {}}, MSCState1} =
read_from_msg_store(MSCState, IsPersistent, Guid),
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
- {MsgStatus #msg_status { msg = Msg },
+ {MsgStatus #msg_status { msg = Msg },
State #vqstate { ram_msg_count = RamMsgCount + 1,
ram_index_count = RamIndexCount1,
msg_store_clients = MSCState1 }};
@@ -590,12 +590,12 @@ read_msg(MsgStatus, State) ->
internal_fetch(AckRequired,
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
+ msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
- State = #vqstate {
- ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len, persistent_count = PCount,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+ State = #vqstate {
+ ram_msg_count = RamMsgCount, out_counter = OutCount,
+ index_state = IndexState, len = Len, persistent_count = PCount,
pending_ack = PA }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
@@ -637,7 +637,7 @@ internal_fetch(AckRequired,
len = Len1,
persistent_count = PCount1,
pending_ack = PA1 })}.
-
+
ack(AckTags, State) ->
a(ack(fun rabbit_msg_store:remove/2,
fun (_AckEntry, State1) -> State1 end,
@@ -682,20 +682,20 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
a(case IsDurable andalso HasPersistentPubs of
true -> ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids,Pubs, AckTags1,
+ msg_store_callback(PersistentGuids,Pubs, AckTags1,
Fun, MsgPropsFun)),
State;
- false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
+ false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
end)}.
requeue(AckTags, MsgPropsFun, State) ->
a(reduce_memory_use(
ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg,
+ fun (#msg_status { msg = Msg,
msg_properties = MsgProperties }, State1) ->
- {_SeqId, State2} =
- publish(Msg, MsgPropsFun(MsgProperties), true,
+ {_SeqId, State2} =
+ publish(Msg, MsgPropsFun(MsgProperties), true,
false, State1),
State2;
({IsPersistent, Guid, MsgProperties}, State1) ->
@@ -852,7 +852,7 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
-msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
+msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
MsgProperties) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
@@ -892,8 +892,8 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx).
erase_tx(Txn) -> erase({txn, Txn}).
persistent_guids(Pubs) ->
- [Guid ||
- {#basic_message { guid = Guid, is_persistent = true },
+ [Guid ||
+ {#basic_message { guid = Guid, is_persistent = true },
_MsgProps} <- Pubs].
betas_from_index_entries(List, TransientThreshold, IndexState) ->
@@ -963,7 +963,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags,
+ true, Pubs, AckTags,
Fun, MsgPropsFun, StateN)
end)
end,
@@ -989,7 +989,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
true -> [AckTag || AckTag <- AckTags,
case dict:fetch(AckTag, PA) of
#msg_status {} -> false;
- {IsPersistent,
+ {IsPersistent,
_Guid, _MsgProps} -> IsPersistent
end];
false -> []
@@ -1026,11 +1026,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Pubs = lists:append(lists:reverse(SPubs)),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent },
+ fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
+ {SeqId, State3} =
publish(Msg, MsgProperties, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
@@ -1098,7 +1098,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
- #msg_status { is_delivered = IsDelivered,
+ #msg_status { is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
@@ -1146,8 +1146,8 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid,
- SeqId,
+ IndexState1 = rabbit_queue_index:publish(Guid,
+ SeqId,
MsgProperties,
IsPersistent,
IndexState),
@@ -1172,8 +1172,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
- msg_properties = MsgProperties } = MsgStatus,
+ msg_on_disk = MsgOnDisk,
+ msg_properties = MsgProperties } = MsgStatus,
PA) ->
AckEntry = case MsgOnDisk of
true -> {IsPersistent, Guid, MsgProperties};
@@ -1227,8 +1227,8 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false }, Acc) ->
Acc;
-accumulate_ack(SeqId,
- {IsPersistent, Guid, _MsgProperties},
+accumulate_ack(SeqId,
+ {IsPersistent, Guid, _MsgProperties},
{SeqIdsAcc, Dict}) ->
{cons_if(IsPersistent, SeqId, SeqIdsAcc),
rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}.