diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-22 15:26:36 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-22 15:26:36 +0000 |
commit | ee69523f13ef37026922ed973965e5e953827a3a (patch) | |
tree | a0bec051637af96b0b5063f84ffdd75763699957 | |
parent | 7f1a4cd62f261734e84a12631ac47a5d1d56f0fd (diff) | |
parent | 2e35c99322e769091fbc582ef0c6354dbd693379 (diff) | |
download | rabbitmq-server-ee69523f13ef37026922ed973965e5e953827a3a.tar.gz |
Merge bug25305
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 24 |
3 files changed, 24 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d8b20335..1df05922 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -485,9 +485,11 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, Remaining}, State1} = + {{Message, IsDelivered, AckTag, _Remaining}, State1} = fetch(AckRequired, State), - {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + drop_expired_messages(State1), + {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. confirm_messages([], State) -> State; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 096f9490..983abf29 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2300,6 +2300,7 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_variable_queue_fold_msg_on_disk/1, fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, @@ -2515,6 +2516,12 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. +test_variable_queue_fold_msg_on_disk(VQ0) -> + VQ1 = variable_queue_publish(true, 1, VQ0), + {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), + VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags), + VQ3. + test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), {new, #amqqueue { pid = QPid, name = QName } = Q} = diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a3fd9d9..6dc65bab 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -641,13 +641,12 @@ ack(AckTags, State) -> fold(undefined, State, _AckTags) -> State; fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> - lists:foldl( - fun(SeqId, State1) -> - {MsgStatus, State2} = - read_msg(gb_trees:get(SeqId, PA), State1), - MsgFun(MsgStatus#msg_status.msg, SeqId), - State2 - end, State, AckTags). + a(lists:foldl(fun(SeqId, State1) -> + {MsgStatus, State2} = + read_msg(gb_trees:get(SeqId, PA), false, State1), + MsgFun(MsgStatus#msg_status.msg, SeqId), + State2 + end, State, AckTags)). requeue(AckTags, #vqstate { delta = Delta, q3 = Q3, @@ -837,6 +836,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, + true = RamMsgCount =< Len, State. @@ -1062,17 +1062,19 @@ queue_out(State = #vqstate { q4 = Q4 }) -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. +read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State). + read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> + CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount, + msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, + State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam), msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> +read_msg(MsgStatus, _CountDiskToRam, State) -> {MsgStatus, State}. internal_fetch(AckRequired, MsgStatus = #msg_status { |