diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-09-23 13:02:58 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-09-23 13:02:58 +0100 |
commit | 276a2b22a9d573c2a097b770ed9c06dc28bae061 (patch) | |
tree | 41cafabeab0918ded9057536f89981d0669ab6cc | |
parent | f8174a443dd985bbf0dac128b21c7d5d84a66499 (diff) | |
download | rabbitmq-server-276a2b22a9d573c2a097b770ed9c06dc28bae061.tar.gz |
invariable queue has dropwhile implementation now
-rw-r--r-- | src/rabbit_invariable_queue.erl | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4626b513..8eb6ebbd 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -118,18 +118,31 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. -dropwhile(Pred, State) -> - State. +dropwhile(_Pred, State = #iv_state { len = 0 }) -> + State; +dropwhile(Pred, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + case Pred(Msg, MsgProps) of + true -> + {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), + dropwhile(Pred, State1); + false -> + State + end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, - queue = Q, - qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, - Q1} = queue:out(Q), +fetch(AckRequired, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). + +fetch_internal(AckRequired, Q1, + Msg = #basic_message {guid = Guid}, + MsgProps, IsDelivered, + State = #iv_state { len = Len, + qname = QName, + durable = IsDurable, + pending_ack = PA }) -> Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = store_ack(Msg, MsgProps, PA), @@ -139,7 +152,7 @@ fetch(AckRequired, State = #iv_state { len = Len, [Guid], PA1), {blank_ack, PA} end, - {{Msg, MsgProps, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, |