diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-06 15:29:53 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-06 15:29:53 +0100 |
commit | a692dd448ef31fd8f58160a1ba8cbe743e16d2ed (patch) | |
tree | c60f15e265bedb532557be235f994559af67c974 | |
parent | 3f050dad0471e3bb71123f45290f3da7ce751f9d (diff) | |
download | rabbitmq-server-a692dd448ef31fd8f58160a1ba8cbe743e16d2ed.tar.gz |
msg_properties -> message_properties in order to be consistent with message and basic_message (though within vq, we have plenty of msg_-prefixes so don't bother inside the msg_status record in there). Also, tidied up a lot of trailing whitespace
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 29 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 66 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 3 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 33 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 22 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 76 |
8 files changed, 120 insertions, 117 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0f0a0e87..f924878d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -74,7 +74,7 @@ -record(event, {type, props, timestamp}). --record(msg_properties, {expiry}). +-record(message_properties, {expiry}). %%---------------------------------------------------------------------------- diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 12d32363..dcff5b37 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -34,13 +34,14 @@ %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). -type(peek_result() :: ('empty'|{rabbit_types:basic_message(), - rabbit_types:msg_properties()})). + rabbit_types:message_properties()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). --type(msg_properties_transformer() :: - fun ((rabbit_types:msg_properties()) -> rabbit_types:msg_properties())). +-type(message_properties_transformer() :: + fun ((rabbit_types:message_properties()) + -> rabbit_types:message_properties())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -49,26 +50,26 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: - (rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) - -> state()). +-spec(publish/3 :: (rabbit_types:basic_message(), + rabbit_types:message_properties_properties(), state()) + -> state()). -spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), - rabbit_types:msg_properties(), state()) - -> {ack(), state()}). + rabbit_types:message_properties(), state()) + -> {ack(), state()}). -spec(dropwhile/2 :: - (fun ((rabbit_types:msg_properties()) -> boolean()), state()) + (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/4 :: - (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:msg_properties(), state()) -> state()). +-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). -spec(tx_commit/4 :: (rabbit_types:txn(), fun (() -> any()), - msg_properties_transformer(), state()) -> {[ack()], state()}). --spec(requeue/3 :: ([ack()], msg_properties_transformer(), state()) -> state()). + message_properties_transformer(), state()) -> {[ack()], state()}). +-spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) + -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 53b98490..91d3f586 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,7 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). +-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). -export([start_link/1, info_keys/0]). @@ -152,7 +152,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- init_queue_state(State) -> - lists:foldl(fun(F, S) -> F(S) end, State, + lists:foldl(fun(F, S) -> F(S) end, State, [fun init_expires/1, fun init_ttl/1]). init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> @@ -413,7 +413,7 @@ deliver_from_queue_deliver(AckRequired, false, State) -> run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), @@ -424,17 +424,18 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, - #msg_properties{}, BQS), + BQ:publish_delivered(AckRequired, Message, + #message_properties{}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}. + {true, + State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, #message_properties{}, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -442,25 +443,25 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, - msg_properties(State), + BQS = BQ:publish(Message, + message_properties(State), State #q.backing_queue_state), {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> + fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) end, State). -fetch(AckRequired, State = #q{backing_queue_state = BQS, +fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> + {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - {{Message, IsDelivered, AckTag, Remaining}, + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} end. @@ -559,9 +560,9 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, - fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(State), + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(State), BQS), %% ChPid must be known here because of the participant management %% by the channel. @@ -583,38 +584,38 @@ subtract_acks(A, B) when is_list(B) -> reset_msg_expiry_fun(State) -> fun(MsgProps) -> - MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + MsgProps#message_properties{expiry = calculate_msg_expiry(State)} end. -msg_properties(State) -> - #msg_properties{expiry = calculate_msg_expiry(State)}. +message_properties(State) -> + #message_properties{expiry = calculate_msg_expiry(State)}. calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = TTL}) -> - now_millis() + (TTL * 1000). + now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; -drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> Now = now_millis(), BQS1 = BQ:dropwhile( - fun (_MsgProperties = #msg_properties{expiry=Expiry}) -> + fun (_MsgProperties = #message_properties{expiry = Expiry}) -> Now > Expiry end, BQS), ensure_ttl_timer(State #q{backing_queue_state = BQS1}). -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) when TTL =/= undefined-> case BQ:is_empty(BQS) of true -> State; false -> - State#q{ttl_timer_ref = + State#q{ttl_timer_ref = timer:send_after(TTL, self(), drop_expired)} end; ensure_ttl_timer(State) -> @@ -622,8 +623,7 @@ ensure_ttl_timer(State) -> now_millis() -> timer:now_diff(now(), {0,0,0}). - - + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -752,7 +752,7 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% %% we don't need an expiry here because messages are not being - %% enqueued, so we use an empty msg_properties. + %% enqueued, so we use an empty message_properties. {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); @@ -781,7 +781,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, drop_expired_messages(State1)) of - {empty, State2} -> + {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 513b14df..11056c8e 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -69,7 +69,8 @@ -type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). -type(work_item() :: - {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} | + {publish, + rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6568aa70..c5a3da53 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -141,7 +141,7 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits %% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). @@ -205,15 +205,16 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_guid:guid(), seq_id(), rabbit_types:msg_properties(), - boolean(), qistate()) -> qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), boolean(), qistate()) + -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), - rabbit_types:msg_properties(), + {[{rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> @@ -258,7 +259,7 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProperties, IsPersistent, State) +publish(Guid, SeqId, MsgProperties, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), @@ -266,7 +267,7 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, create_pub_record_body(Guid, MsgProperties)]), maybe_flush_journal( @@ -463,8 +464,8 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, - {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, + fun (RelSeq, + {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) @@ -518,8 +519,8 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, - {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, + {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> @@ -533,7 +534,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) -> +create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> [Guid, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> @@ -552,11 +553,11 @@ read_pub_record_body(Hdl) -> ?NO_EXPIRY -> undefined; X -> X end, - {Guid, #msg_properties{expiry = Exp}}; + {Guid, #message_properties{expiry = Exp}}; Error -> Error end. - + %%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -806,8 +807,8 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, - {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, + fun (RelSeq, + {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d2489685..638a45e1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1639,7 +1639,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, #msg_properties{}, Persistent, QiN), + Guid, SeqId, #message_properties{}, Persistent, QiN), {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} @@ -1661,9 +1661,9 @@ test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> Guid = rabbit_guid:guid(), - Props = #msg_properties{expiry=12345}, + Props = #message_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), Qi2 end), @@ -1802,12 +1802,12 @@ variable_queue_publish(IsPersistent, Count, VQ) -> fun (_N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), + rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), - #msg_properties{}, VQN) + end}, <<>>), + #message_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1853,14 +1853,14 @@ test_dropwhile(VQ0) -> fun (N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, <<>>), - #msg_properties{expiry = N}, VQN) + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #message_properties{expiry = N}, VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( - fun(#msg_properties { expiry = Expiry }) -> + fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 end, VQ1), @@ -1875,7 +1875,7 @@ test_dropwhile(VQ0) -> {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), VQ4. - + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1db23883..7671267c 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -42,7 +42,7 @@ binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0, - msg_properties/0]). + message_properties/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -87,8 +87,8 @@ txn :: maybe(txn()), sender :: pid(), message :: message()}). --type(msg_properties() :: - #msg_properties{expiry :: pos_integer() | 'undefined'}). +-type(message_properties() :: + #message_properties{expiry :: pos_integer() | 'undefined'}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 78958717..72fa4aeb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -524,22 +524,22 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, in_counter = InCount + 1, persistent_count = PCount1, pending_ack = PA1 })}. - + dropwhile(Pred, State) -> case internal_queue_out( fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) -> case Pred(MsgProps) of true -> - {_, State2} = internal_fetch(false, + {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, State2); + dropwhile(Pred, State2); false -> %% message needs to go back into Q4 (or %% maybe go in for the first time if it was %% loaded from Q3). Also the msg contents %% might not be in RAM, so read them in now - {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = read_msg(MsgStatus, State1), State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)} end @@ -550,11 +550,11 @@ dropwhile(Pred, State) -> fetch(AckRequired, State) -> internal_queue_out( - fun(MsgStatus, State1) -> + fun(MsgStatus, State1) -> %% it's possible that the message wasn't read from disk %% at this point, so read it in. {MsgStatus1, State2} = read_msg(MsgStatus, State1), - internal_fetch(AckRequired, MsgStatus1, State2) + internal_fetch(AckRequired, MsgStatus1, State2) end, State). internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> @@ -568,20 +568,20 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. -read_msg(MsgStatus = #msg_status { msg = undefined, - guid = Guid, +read_msg(MsgStatus = #msg_status { msg = undefined, + guid = Guid, index_on_disk = IndexOnDisk, - is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = + {{ok, Msg = #basic_message {}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - {MsgStatus #msg_status { msg = Msg }, + {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, ram_index_count = RamIndexCount1, msg_store_clients = MSCState1 }}; @@ -590,12 +590,12 @@ read_msg(MsgStatus, State) -> internal_fetch(AckRequired, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, persistent_count = PCount, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State = #vqstate { + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( @@ -637,7 +637,7 @@ internal_fetch(AckRequired, len = Len1, persistent_count = PCount1, pending_ack = PA1 })}. - + ack(AckTags, State) -> a(ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, @@ -682,20 +682,20 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> a(case IsDurable andalso HasPersistentPubs of true -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids,Pubs, AckTags1, + msg_store_callback(PersistentGuids,Pubs, AckTags1, Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, Fun, MsgPropsFun, State) end)}. requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg, + fun (#msg_status { msg = Msg, msg_properties = MsgProperties }, State1) -> - {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, + {_SeqId, State2} = + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; ({IsPersistent, Guid, MsgProperties}, State1) -> @@ -852,7 +852,7 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, @@ -892,8 +892,8 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || - {#basic_message { guid = Guid, is_persistent = true }, + [Guid || + {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> @@ -963,7 +963,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, + true, Pubs, AckTags, Fun, MsgPropsFun, StateN) end) end, @@ -989,7 +989,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, + {IsPersistent, _Guid, _MsgProps} -> IsPersistent end]; false -> [] @@ -1026,11 +1026,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = + {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), @@ -1098,7 +1098,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) - #msg_status { is_delivered = IsDelivered, + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of @@ -1146,8 +1146,8 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, - SeqId, + IndexState1 = rabbit_queue_index:publish(Guid, + SeqId, MsgProperties, IsPersistent, IndexState), @@ -1172,8 +1172,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, - msg_properties = MsgProperties } = MsgStatus, + msg_on_disk = MsgOnDisk, + msg_properties = MsgProperties } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of true -> {IsPersistent, Guid, MsgProperties}; @@ -1227,8 +1227,8 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, - {IsPersistent, Guid, _MsgProperties}, +accumulate_ack(SeqId, + {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |