diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-16 12:24:53 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-16 12:24:53 +0000 |
commit | 37a2d0afce0e9b2709dc3bb0c39d8e594650efb1 (patch) | |
tree | 7bb18090bc8089ef87c78f39602df9b3476e072d | |
parent | 1eb8f51aaf43378d24ef94b5cefd9d317089d45a (diff) | |
parent | 61f9eb7b33cbaa05cb64f0158a3bd40bd9f0c050 (diff) | |
download | rabbitmq-server-37a2d0afce0e9b2709dc3bb0c39d8e594650efb1.tar.gz |
merge default into bug23625
-rw-r--r-- | src/rabbit_amqqueue.erl | 37 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 108 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 4 |
4 files changed, 69 insertions, 89 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4684ad7c..be7c7867 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -284,8 +284,7 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(Q1, Q2) -> - rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). +policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -398,10 +397,8 @@ check_declare_arguments(QueueName, Args) -> end || {Key, Fun} <- Checks], ok. -check_string_arg({longstr, _}, _Args) -> - ok; -check_string_arg({Type, _}, _) -> - {error, {unacceptable_type, Type}}. +check_string_arg({longstr, _}, _Args) -> ok; +check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of @@ -427,11 +424,10 @@ check_dlxrk_arg({longstr, _}, Args) -> undefined -> {error, routing_key_but_no_dlx_defined}; _ -> ok end; -check_dlxrk_arg({Type, _}, _Args) -> +check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -list() -> - mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). +list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list(VHostPath) -> mnesia:dirty_match_object( @@ -442,8 +438,7 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). -info(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, info). +info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> case delegate_call(QPid, {info, Items}) of @@ -460,8 +455,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). %% the first place since a node failed). Therefore we keep poking at %% the list of queues until we were able to talk to a live process or %% the queue no longer exists. -force_event_refresh() -> - force_event_refresh([Q#amqqueue.name || Q <- list()]). +force_event_refresh() -> force_event_refresh([Q#amqqueue.name || Q <- list()]). force_event_refresh(QNames) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], @@ -478,8 +472,7 @@ force_event_refresh(QNames) -> wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). -consumers(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, consumers). +consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -493,8 +486,7 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> - delegate_call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). delete_immediately(QPids) -> [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], @@ -509,11 +501,9 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, MsgIds, ChPid) -> - delegate_call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, MsgIds, ChPid) -> - delegate_cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). @@ -555,8 +545,7 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> - delegate_cast(QPid, {unblock, ChPid}). +unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_no_result( @@ -599,7 +588,7 @@ set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). -stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9bd465dd..3bad6864 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -585,6 +585,18 @@ fetch(AckRequired, State = #q{backing_queue = BQ, {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. +ack(AckTags, ChPid, State) -> + subtract_acks(ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end). + +requeue(AckTags, ChPid, State) -> + subtract_acks(ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end). + remove_consumer(ChPid, ConsumerTag, Queue) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) -> (CP /= ChPid) or (CTag /= ConsumerTag) @@ -706,17 +718,18 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. -drop_expired_messages(State = #q{backing_queue_state = BQS, +drop_expired_messages(State = #q{dlx = DLX, + backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), - DLXFun = dead_letter_fun(expired, State), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = case DLXFun of + {Props, BQS1} = case DLX of undefined -> {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), {Next, BQS2}; _ -> {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + DLXFun = dead_letter_fun(expired), DLXFun(Msgs), {Next, BQS2} end, @@ -744,17 +757,7 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ensure_ttl_timer(_Expiry, State) -> State. -ack_if_no_dlx(AckTags, State = #q{dlx = undefined, - backing_queue = BQ, - backing_queue_state = BQS }) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State#q{backing_queue_state = BQS1}; -ack_if_no_dlx(_AckTags, State) -> - State. - -dead_letter_fun(_Reason, #q{dlx = undefined}) -> - undefined; -dead_letter_fun(Reason, _State) -> +dead_letter_fun(Reason) -> fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end. dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> @@ -763,8 +766,8 @@ dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + {_, DeliveredQPids} = rabbit_amqqueue:deliver( + rabbit_amqqueue:lookup(Queues), Delivery), DeliveredQPids. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, @@ -786,17 +789,16 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC1}) end. -stop_later(Reason, State) -> - stop_later(Reason, undefined, noreply, State). +stop(State) -> stop(undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> +stop(From, Reply, State = #q{unconfirmed = UC}) -> case {dtree:is_empty(UC), Reply} of {true, noreply} -> - {stop, Reason, State}; + {stop, normal, State}; {true, _} -> - {stop, Reason, Reply, State}; + {stop, normal, Reply, State}; {false, _} -> - noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) + noreply(State#q{delayed_stop = {From, Reply}}) end. cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, @@ -807,11 +809,10 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, State1 = State#q{backing_queue_state = BQS1}, case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of - {_, {_, noreply}} -> ok; - {_, {From, Reply}} -> gen_server2:reply(From, Reply) + {_, noreply} -> ok; + {From, Reply} -> gen_server2:reply(From, Reply) end, - {Reason, _} = DS, - {stop, Reason, State1}; + {stop, normal, State1}; false -> noreply(State1) end. @@ -1043,10 +1044,11 @@ handle_call({notify_down, ChPid}, From, State) -> %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by - %% gen_server2 *before* the reply is sent. + %% gen_server2 *before* the reply is sent. FIXME: in case of a + %% delayed stop the reply is sent earlier. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> stop_later(normal, From, ok, State1) + {stop, State1} -> stop(From, ok, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -1120,7 +1122,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> stop_later(normal, From, ok, State1) + true -> stop(From, ok, State1) end end; @@ -1136,8 +1138,7 @@ handle_call({delete, IfUnused, IfEmpty}, From, if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> stop_later(normal, From, - {ok, BQ:len(BQS)}, State) + true -> stop(From, {ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1147,9 +1148,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - noreply(subtract_acks( - ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)); + noreply(requeue(AckTags, ChPid, State)); handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1211,36 +1210,27 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); handle_cast({ack, AckTags, ChPid}, State) -> + noreply(ack(AckTags, ChPid, State)); + +handle_cast({reject, AckTags, true, ChPid}, State) -> + noreply(requeue(AckTags, ChPid, State)); + +handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) -> + noreply(ack(AckTags, ChPid, State)); + +handle_cast({reject, AckTags, false, ChPid}, State) -> + DLXFun = dead_letter_fun(rejected), noreply(subtract_acks( ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end, + BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); -handle_cast({reject, AckTags, Requeue, ChPid}, State) -> - noreply(subtract_acks( - ChPid, AckTags, State, - case Requeue of - true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - Fun = - case dead_letter_fun(rejected, State1) of - undefined -> undefined; - F -> fun(M, A) -> F([{M, A}]) - end - end, - BQS1 = BQ:fold(Fun, BQS, AckTags), - ack_if_no_dlx( - AckTags, - State1#q{backing_queue_state = BQS1}) - end - end)); - handle_cast(delete_immediately, State) -> - stop_later(normal, State); + stop(State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1316,7 +1306,7 @@ handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> stop_later(normal, State); + true -> stop(State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1338,12 +1328,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - stop_later(normal, State); + stop(State); handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, State1} -> handle_queue_down(DownPid, Reason, State1); - {stop, State1} -> stop_later(normal, State1) + {stop, State1} -> stop(State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9dbfbdea..2afc4c36 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1126,12 +1126,13 @@ consumer_monitor(ConsumerTag, State end. -monitor_delivering_queue(true, _QPid, State) -> - State; -monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, +monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons, delivering_queues = DQ}) -> State#ch{queue_monitors = pmon:monitor(QPid, QMons), - delivering_queues = sets:add_element(QPid, DQ)}. + delivering_queues = case NoAck of + true -> DQ; + false -> sets:add_element(QPid, DQ) + end}. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_exit(Reason) of diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 2f75ef2e..2b3bd027 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -315,8 +315,8 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + {false, true} -> rabbit_amqqueue:start_mirroring(QPid); + {true, true} -> update_mirrors0(OldQ, NewQ) end. update_mirrors0(OldQ = #amqqueue{name = QName}, |