summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-04 18:38:12 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-04 18:38:12 +0000
commit6532a3ca4f8ec7b51e3810c4c2220aaeea681935 (patch)
treef1cc7ed8bcd701d3d421ebd20ae150732a9b60ce
parentca2611243ec8129bfc045345ef5ed2e0045c6ea5 (diff)
downloadrabbitmq-server-6532a3ca4f8ec7b51e3810c4c2220aaeea681935.tar.gz
some more control flow abstraction
-rw-r--r--src/rabbit_amqqueue_process.erl47
1 files changed, 21 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 66e48024..0bef1e4b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -711,28 +711,27 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{dlx = DLX,
- backing_queue_state = BQS,
+drop_expired_msgs(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, State1} =
- case DLX of
- undefined -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
- {Next, State#q{backing_queue_state = BQS1}};
- _ -> case rabbit_exchange:lookup(DLX) of
- {ok, X} ->
- dead_letter_expired_msgs(ExpirePred, X, State);
- {error, not_found} ->
- {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
- {Next, State#q{backing_queue_state = BQS1}}
- end
- end,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
+ fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
end, State1).
+with_dlx(undefined, _With, Without) -> Without();
+with_dlx(DLX, With, Without) -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} -> With(X);
+ {error, not_found} -> Without()
+ end.
+
dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
@@ -1221,19 +1220,15 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
- noreply(ack(AckTags, ChPid, State));
-
-handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = DLX}) ->
- noreply(case rabbit_exchange:lookup(DLX) of
- {ok, X} -> subtract_acks(
- ChPid, AckTags, State,
- fun (State1) ->
- dead_letter_rejected_msgs(
- AckTags, X, State1)
- end);
- {error, not_found} -> ack(AckTags, ChPid, State)
- end);
+handle_cast({reject, AckTags, false, ChPid}, State) ->
+ noreply(with_dlx(
+ State#q.dlx,
+ fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (State1) ->
+ dead_letter_rejected_msgs(
+ AckTags, X, State1)
+ end) end,
+ fun () -> ack(AckTags, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);