diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 18:38:12 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 18:38:12 +0000 |
commit | 6532a3ca4f8ec7b51e3810c4c2220aaeea681935 (patch) | |
tree | f1cc7ed8bcd701d3d421ebd20ae150732a9b60ce | |
parent | ca2611243ec8129bfc045345ef5ed2e0045c6ea5 (diff) | |
download | rabbitmq-server-6532a3ca4f8ec7b51e3810c4c2220aaeea681935.tar.gz |
some more control flow abstraction
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 |
1 files changed, 21 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 66e48024..0bef1e4b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -711,28 +711,27 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. -drop_expired_msgs(State = #q{dlx = DLX, - backing_queue_state = BQS, +drop_expired_msgs(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, State1} = - case DLX of - undefined -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), - {Next, State#q{backing_queue_state = BQS1}}; - _ -> case rabbit_exchange:lookup(DLX) of - {ok, X} -> - dead_letter_expired_msgs(ExpirePred, X, State); - {error, not_found} -> - {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), - {Next, State#q{backing_queue_state = BQS1}} - end - end, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end, + fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), + {Next, State#q{backing_queue_state = BQS1}} end), ensure_ttl_timer(case Props of undefined -> undefined; #message_properties{expiry = Exp} -> Exp end, State1). +with_dlx(undefined, _With, Without) -> Without(); +with_dlx(DLX, With, Without) -> case rabbit_exchange:lookup(DLX) of + {ok, X} -> With(X); + {error, not_found} -> Without() + end. + dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) -> dead_letter_msgs(fun (DLFun, Acc, BQS1) -> BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1) @@ -1221,19 +1220,15 @@ handle_cast({ack, AckTags, ChPid}, State) -> handle_cast({reject, AckTags, true, ChPid}, State) -> noreply(requeue(AckTags, ChPid, State)); -handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) -> - noreply(ack(AckTags, ChPid, State)); - -handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = DLX}) -> - noreply(case rabbit_exchange:lookup(DLX) of - {ok, X} -> subtract_acks( - ChPid, AckTags, State, - fun (State1) -> - dead_letter_rejected_msgs( - AckTags, X, State1) - end); - {error, not_found} -> ack(AckTags, ChPid, State) - end); +handle_cast({reject, AckTags, false, ChPid}, State) -> + noreply(with_dlx( + State#q.dlx, + fun (X) -> subtract_acks(ChPid, AckTags, State, + fun (State1) -> + dead_letter_rejected_msgs( + AckTags, X, State1) + end) end, + fun () -> ack(AckTags, ChPid, State) end)); handle_cast(delete_immediately, State) -> stop(State); |