summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-28 13:31:37 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-28 13:31:37 +0100
commitfd3581c6165e7e6356789f295d4910a6fc0330d3 (patch)
tree6802ee1f0b4422b285b49371e8d435112ab37edc
parentd7d7d70cc8cd70a894e3802e4ffe5022ad236c1a (diff)
downloadrabbitmq-server-fd3581c6165e7e6356789f295d4910a6fc0330d3.tar.gz
added timer to collect expired messages
-rw-r--r--src/rabbit_amqqueue_process.erl46
1 files changed, 33 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 955b607f..52663f15 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -64,7 +64,8 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- ttl
+ ttl,
+ ttl_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -441,7 +442,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
BQS = BQ:publish(Message,
msg_properties(State),
State #q.backing_queue_state),
- {false, NewState#q{backing_queue_state = BQS}}
+ {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -460,17 +461,6 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
State#q{backing_queue_state = BQS1}}
end.
-drop_expired_messages(State = #q{ttl = undefined}) ->
- State;
-drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
- Now = timer:now_diff(now(), {0,0,0}),
- BQS1 = BQ:dropwhile(
- fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) ->
- Now > Expiry
- end, BQS),
- State #q{backing_queue_state = BQS1}.
-
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
@@ -602,6 +592,33 @@ calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
Now = timer:now_diff(now(), {0,0,0}),
Now + (Ttl * 1000).
+drop_expired_messages(State = #q{ttl = undefined}) ->
+ State;
+drop_expired_messages(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ BQS1 = BQ:dropwhile(
+ fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now > Expiry
+ end, BQS),
+ ensure_ttl_timer(State #q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = Ttl,
+ ttl_timer_ref = undefined})
+ when Ttl =/= undefined->
+ case BQ:is_empty(BQS) of
+ true ->
+ State;
+ false ->
+ State#q{ttl_timer_ref =
+ timer:send_after(Ttl, self(), drop_expired)}
+ end;
+ensure_ttl_timer(State) ->
+ State.
+
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -993,6 +1010,9 @@ handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+handle_info(drop_expired, State) ->
+ noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};