summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-23 13:02:58 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-23 13:02:58 +0100
commit276a2b22a9d573c2a097b770ed9c06dc28bae061 (patch)
tree41cafabeab0918ded9057536f89981d0669ab6cc
parentf8174a443dd985bbf0dac128b21c7d5d84a66499 (diff)
downloadrabbitmq-server-276a2b22a9d573c2a097b770ed9c06dc28bae061.tar.gz
invariable queue has dropwhile implementation now
-rw-r--r--src/rabbit_invariable_queue.erl33
1 files changed, 23 insertions, 10 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 4626b513..8eb6ebbd 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -118,18 +118,31 @@ publish_delivered(true, Msg = #basic_message { guid = Guid },
ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
-dropwhile(Pred, State) ->
- State.
+dropwhile(_Pred, State = #iv_state { len = 0 }) ->
+ State;
+dropwhile(Pred, State = #iv_state { queue = Q }) ->
+ {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
+ case Pred(Msg, MsgProps) of
+ true ->
+ {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State),
+ dropwhile(Pred, State1);
+ false ->
+ State
+ end.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
-fetch(AckRequired, State = #iv_state { len = Len,
- queue = Q,
- qname = QName,
- durable = IsDurable,
- pending_ack = PA }) ->
- {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}},
- Q1} = queue:out(Q),
+fetch(AckRequired, State = #iv_state { queue = Q }) ->
+ {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
+ fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State).
+
+fetch_internal(AckRequired, Q1,
+ Msg = #basic_message {guid = Guid},
+ MsgProps, IsDelivered,
+ State = #iv_state { len = Len,
+ qname = QName,
+ durable = IsDurable,
+ pending_ack = PA }) ->
Len1 = Len - 1,
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = store_ack(Msg, MsgProps, PA),
@@ -139,7 +152,7 @@ fetch(AckRequired, State = #iv_state { len = Len,
[Guid], PA1),
{blank_ack, PA}
end,
- {{Msg, MsgProps, IsDelivered, AckTag, Len1},
+ {{Msg, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,