diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-09-28 13:31:37 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-09-28 13:31:37 +0100 |
commit | fd3581c6165e7e6356789f295d4910a6fc0330d3 (patch) | |
tree | 6802ee1f0b4422b285b49371e8d435112ab37edc | |
parent | d7d7d70cc8cd70a894e3802e4ffe5022ad236c1a (diff) | |
download | rabbitmq-server-fd3581c6165e7e6356789f295d4910a6fc0330d3.tar.gz |
added timer to collect expired messages
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 |
1 files changed, 33 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 955b607f..52663f15 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -64,7 +64,8 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - ttl + ttl, + ttl_timer_ref }). -record(consumer, {tag, ack_required}). @@ -441,7 +442,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> BQS = BQ:publish(Message, msg_properties(State), State #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -460,17 +461,6 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, State#q{backing_queue_state = BQS1}} end. -drop_expired_messages(State = #q{ttl = undefined}) -> - State; -drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - Now = timer:now_diff(now(), {0,0,0}), - BQS1 = BQ:dropwhile( - fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> - Now > Expiry - end, BQS), - State #q{backing_queue_state = BQS1}. - add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> @@ -602,6 +592,33 @@ calculate_msg_expiry(_State = #q{ttl = Ttl}) -> Now = timer:now_diff(now(), {0,0,0}), Now + (Ttl * 1000). +drop_expired_messages(State = #q{ttl = undefined}) -> + State; +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + Now = timer:now_diff(now(), {0,0,0}), + BQS1 = BQ:dropwhile( + fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> + Now > Expiry + end, BQS), + ensure_ttl_timer(State #q{backing_queue_state = BQS1}). + +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = Ttl, + ttl_timer_ref = undefined}) + when Ttl =/= undefined-> + case BQ:is_empty(BQS) of + true -> + State; + false -> + State#q{ttl_timer_ref = + timer:send_after(Ttl, self(), drop_expired)} + end; +ensure_ttl_timer(State) -> + State. + + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -993,6 +1010,9 @@ handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:idle_timeout(BQS) end, State)); +handle_info(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; |