summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-16 12:24:53 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-16 12:24:53 +0000
commit37a2d0afce0e9b2709dc3bb0c39d8e594650efb1 (patch)
tree7bb18090bc8089ef87c78f39602df9b3476e072d
parent1eb8f51aaf43378d24ef94b5cefd9d317089d45a (diff)
parent61f9eb7b33cbaa05cb64f0158a3bd40bd9f0c050 (diff)
downloadrabbitmq-server-37a2d0afce0e9b2709dc3bb0c39d8e594650efb1.tar.gz
merge default into bug23625
-rw-r--r--src/rabbit_amqqueue.erl37
-rw-r--r--src/rabbit_amqqueue_process.erl108
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_mirror_queue_misc.erl4
4 files changed, 69 insertions, 89 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4684ad7c..be7c7867 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -284,8 +284,7 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) ->
- rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
+policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -398,10 +397,8 @@ check_declare_arguments(QueueName, Args) ->
end || {Key, Fun} <- Checks],
ok.
-check_string_arg({longstr, _}, _Args) ->
- ok;
-check_string_arg({Type, _}, _) ->
- {error, {unacceptable_type, Type}}.
+check_string_arg({longstr, _}, _Args) -> ok;
+check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -427,11 +424,10 @@ check_dlxrk_arg({longstr, _}, Args) ->
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
-check_dlxrk_arg({Type, _}, _Args) ->
+check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-list() ->
- mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -442,8 +438,7 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
case delegate_call(QPid, {info, Items}) of
@@ -460,8 +455,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
%% the first place since a node failed). Therefore we keep poking at
%% the list of queues until we were able to talk to a live process or
%% the queue no longer exists.
-force_event_refresh() ->
- force_event_refresh([Q#amqqueue.name || Q <- list()]).
+force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]).
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
@@ -478,8 +472,7 @@ force_event_refresh(QNames) ->
wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
-consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -493,8 +486,7 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) ->
- delegate_call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat).
delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
@@ -509,11 +501,9 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
@@ -555,8 +545,7 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) ->
- delegate_cast(QPid, {unblock, ChPid}).
+unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
@@ -599,7 +588,7 @@ set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9bd465dd..3bad6864 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -585,6 +585,18 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
+ack(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end).
+
+requeue(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end).
+
remove_consumer(ChPid, ConsumerTag, Queue) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
@@ -706,17 +718,18 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{backing_queue_state = BQS,
+drop_expired_messages(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
- DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLXFun of
+ {Props, BQS1} = case DLX of
undefined -> {Next, undefined, BQS2} =
BQ:dropwhile(ExpirePred, false, BQS),
{Next, BQS2};
_ -> {Next, Msgs, BQS2} =
BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun = dead_letter_fun(expired),
DLXFun(Msgs),
{Next, BQS2}
end,
@@ -744,17 +757,7 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ensure_ttl_timer(_Expiry, State) ->
State.
-ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State#q{backing_queue_state = BQS1};
-ack_if_no_dlx(_AckTags, State) ->
- State.
-
-dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- undefined;
-dead_letter_fun(Reason, _State) ->
+dead_letter_fun(Reason) ->
fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
@@ -763,8 +766,8 @@ dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(
+ rabbit_amqqueue:lookup(Queues), Delivery),
DeliveredQPids.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
@@ -786,17 +789,16 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC1})
end.
-stop_later(Reason, State) ->
- stop_later(Reason, undefined, noreply, State).
+stop(State) -> stop(undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
- {stop, Reason, State};
+ {stop, normal, State};
{true, _} ->
- {stop, Reason, Reply, State};
+ {stop, normal, Reply, State};
{false, _} ->
- noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
+ noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -807,11 +809,10 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
State1 = State#q{backing_queue_state = BQS1},
case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
- {_, {_, noreply}} -> ok;
- {_, {From, Reply}} -> gen_server2:reply(From, Reply)
+ {_, noreply} -> ok;
+ {From, Reply} -> gen_server2:reply(From, Reply)
end,
- {Reason, _} = DS,
- {stop, Reason, State1};
+ {stop, normal, State1};
false -> noreply(State1)
end.
@@ -1043,10 +1044,11 @@ handle_call({notify_down, ChPid}, From, State) ->
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent.
+ %% gen_server2 *before* the reply is sent. FIXME: in case of a
+ %% delayed stop the reply is sent earlier.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop_later(normal, From, ok, State1)
+ {stop, State1} -> stop(From, ok, State1)
end;
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -1120,7 +1122,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop_later(normal, From, ok, State1)
+ true -> stop(From, ok, State1)
end
end;
@@ -1136,8 +1138,7 @@ handle_call({delete, IfUnused, IfEmpty}, From,
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop_later(normal, From,
- {ok, BQ:len(BQS)}, State)
+ true -> stop(From, {ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1147,9 +1148,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end));
+ noreply(requeue(AckTags, ChPid, State));
handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1211,36 +1210,27 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
+ noreply(ack(AckTags, ChPid, State));
+
+handle_cast({reject, AckTags, true, ChPid}, State) ->
+ noreply(requeue(AckTags, ChPid, State));
+
+handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
+ noreply(ack(AckTags, ChPid, State));
+
+handle_cast({reject, AckTags, false, ChPid}, State) ->
+ DLXFun = dead_letter_fun(rejected),
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
+ BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
- noreply(subtract_acks(
- ChPid, AckTags, State,
- case Requeue of
- true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- Fun =
- case dead_letter_fun(rejected, State1) of
- undefined -> undefined;
- F -> fun(M, A) -> F([{M, A}])
- end
- end,
- BQS1 = BQ:fold(Fun, BQS, AckTags),
- ack_if_no_dlx(
- AckTags,
- State1#q{backing_queue_state = BQS1})
- end
- end));
-
handle_cast(delete_immediately, State) ->
- stop_later(normal, State);
+ stop(State);
handle_cast({unblock, ChPid}, State) ->
noreply(
@@ -1316,7 +1306,7 @@ handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> stop_later(normal, State);
+ true -> stop(State);
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1338,12 +1328,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
- stop_later(normal, State);
+ stop(State);
handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, State1} -> handle_queue_down(DownPid, Reason, State1);
- {stop, State1} -> stop_later(normal, State1)
+ {stop, State1} -> stop(State1)
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9dbfbdea..2afc4c36 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1126,12 +1126,13 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(true, _QPid, State) ->
- State;
-monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
+monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
delivering_queues = DQ}) ->
State#ch{queue_monitors = pmon:monitor(QPid, QMons),
- delivering_queues = sets:add_element(QPid, DQ)}.
+ delivering_queues = case NoAck of
+ true -> DQ;
+ false -> sets:add_element(QPid, DQ)
+ end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_exit(Reason) of
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 2f75ef2e..2b3bd027 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -315,8 +315,8 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
{true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
+ {true, true} -> update_mirrors0(OldQ, NewQ)
end.
update_mirrors0(OldQ = #amqqueue{name = QName},