diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-21 16:55:07 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-21 16:55:07 +0100 |
commit | 8ae2c303acb5def8c33b05acf49a9c0b3cfcded3 (patch) | |
tree | 58689fd4e5772e23569dfba6e90ffb1cbc954fdd | |
parent | 4a84bd9d7908931bbe7ce22e7f24129b66e6e7f7 (diff) | |
download | rabbitmq-server-8ae2c303acb5def8c33b05acf49a9c0b3cfcded3.tar.gz |
Correct spec for sync(), and some cosmetics
-rw-r--r-- | src/rabbit_tests.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 17 |
2 files changed, 9 insertions, 11 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0db1150b..00547a26 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1909,8 +1909,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = - rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 587bc939..0b948c1b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -295,7 +295,8 @@ -type(sync() :: #sync { acks_persistent :: [[seq_id()]], acks_all :: [[seq_id()]], - pubs :: [[rabbit_guid:guid()]], + pubs :: [{message_properties_transformer(), + [rabbit_types:basic_message()]}], funs :: [fun (() -> any())] }). -type(state() :: #vqstate { @@ -1010,7 +1011,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:foldl( + Pubs = lists:foldl( fun({Fun, PubsN}, OuterAcc) -> lists:foldl( fun({Msg, MsgProps}, Acc) -> @@ -1091,8 +1092,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) - #msg_status { is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk}, + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1131,10 +1131,10 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, msg_properties = MsgProperties}, IndexState) when Force orelse IsPersistent -> @@ -1337,7 +1337,6 @@ fetch_from_q3(State = #vqstate { {{value, IndexOnDisk, MsgStatus}, Q3a} -> RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, ram_index_count = RamIndexCount1 }, State2 = |