summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-21 16:55:07 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-21 16:55:07 +0100
commit8ae2c303acb5def8c33b05acf49a9c0b3cfcded3 (patch)
tree58689fd4e5772e23569dfba6e90ffb1cbc954fdd
parent4a84bd9d7908931bbe7ce22e7f24129b66e6e7f7 (diff)
downloadrabbitmq-server-8ae2c303acb5def8c33b05acf49a9c0b3cfcded3.tar.gz
Correct spec for sync(), and some cosmetics
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl17
2 files changed, 9 insertions, 11 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 0db1150b..00547a26 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1909,8 +1909,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} =
- rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 587bc939..0b948c1b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -295,7 +295,8 @@
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
acks_all :: [[seq_id()]],
- pubs :: [[rabbit_guid:guid()]],
+ pubs :: [{message_properties_transformer(),
+ [rabbit_types:basic_message()]}],
funs :: [fun (() -> any())] }).
-type(state() :: #vqstate {
@@ -1010,7 +1011,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- Pubs = lists:foldl(
+ Pubs = lists:foldl(
fun({Fun, PubsN}, OuterAcc) ->
lists:foldl(
fun({Msg, MsgProps}, Acc) ->
@@ -1091,8 +1092,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties))
- #msg_status { is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk},
+ #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
@@ -1131,10 +1131,10 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
+ guid = Guid,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
msg_properties = MsgProperties},
IndexState)
when Force orelse IsPersistent ->
@@ -1337,7 +1337,6 @@ fetch_from_q3(State = #vqstate {
{{value, IndexOnDisk, MsgStatus}, Q3a} ->
RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
-
State1 = State #vqstate { q3 = Q3a,
ram_index_count = RamIndexCount1 },
State2 =