summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-11-26 12:07:29 +0000
committerTim Watson <tim@rabbitmq.com>2012-11-26 12:07:29 +0000
commit5818c2ad750183a9c15d06dbe1b392aff388ce93 (patch)
tree872b6d6d5e427aa0b5cfac7f035825cc299bf611
parentead36b74b15e2eefce4b66579e524c5bf5ed15b5 (diff)
parentee69523f13ef37026922ed973965e5e953827a3a (diff)
downloadrabbitmq-server-bug25309.tar.gz
merge stable into bug25309bug25309
-rw-r--r--src/rabbit_amqqueue_process.erl36
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl24
3 files changed, 42 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6f8c9305..f4459e45 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -486,9 +486,11 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, Remaining}, State1} =
+ {{Message, IsDelivered, AckTag, _Remaining}, State1} =
fetch(AckRequired, State),
- {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ drop_expired_messages(State1),
+ {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -1258,18 +1260,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- noreply(lists:foldl(
- fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMon}) ->
- QPids = dead_letter_publish(Msg, Reason, X,
- State1),
- UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
- QMons = pmon:monitor_all(QPids, QMon),
- State1#q{queue_monitors = QMons,
- publish_seqno = SeqNo + 1,
- unconfirmed = UC1}
- end, State, Msgs));
+ {AckImmediately, State2} =
+ lists:foldl(
+ fun({Msg, AckTag},
+ {Acks, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}}) ->
+ case dead_letter_publish(Msg, Reason, X, State1) of
+ [] -> {[AckTag | Acks], State1};
+ QPids -> UC1 = dtree:insert(
+ SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ {Acks,
+ State1#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}}
+ end
+ end, {[], State}, Msgs),
+ cleanup_after_confirm(AckImmediately, State2);
{error, not_found} ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 096f9490..983abf29 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2300,6 +2300,7 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_variable_queue_fold_msg_on_disk/1,
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
@@ -2515,6 +2516,12 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
+test_variable_queue_fold_msg_on_disk(VQ0) ->
+ VQ1 = variable_queue_publish(true, 1, VQ0),
+ {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
+ VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags),
+ VQ3.
+
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a3fd9d9..6dc65bab 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -641,13 +641,12 @@ ack(AckTags, State) ->
fold(undefined, State, _AckTags) ->
State;
fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- lists:foldl(
- fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags).
+ a(lists:foldl(fun(SeqId, State1) ->
+ {MsgStatus, State2} =
+ read_msg(gb_trees:get(SeqId, PA), false, State1),
+ MsgFun(MsgStatus#msg_status.msg, SeqId),
+ State2
+ end, State, AckTags)).
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
@@ -837,6 +836,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
State.
@@ -1062,17 +1062,19 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
+read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
+
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },
- State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+ CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + 1,
+ State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, State) ->
+read_msg(MsgStatus, _CountDiskToRam, State) ->
{MsgStatus, State}.
internal_fetch(AckRequired, MsgStatus = #msg_status {