diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-01-19 22:19:56 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-01-19 22:19:56 +0000 |
commit | bbb71f861fe3cfd231d788d97b897f18d49feaf1 (patch) | |
tree | fc9acf36e8e40d5c8a32fd7055f18c8f02a0a247 | |
parent | 7b4ef01338bece10ade9a8b79a121a4c9babe18d (diff) | |
parent | 68d915da91dd7acd1f134a016feffb87ea7e3626 (diff) | |
download | rabbitmq-server-bbb71f861fe3cfd231d788d97b897f18d49feaf1.tar.gz |
merge default into bug20173
-rw-r--r-- | docs/rabbitmqctl.1.pod | 12 | ||||
-rw-r--r-- | src/buffering_proxy.erl | 108 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 46 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 182 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 1 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 |
7 files changed, 126 insertions, 241 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index b9edd584..692c4dc5 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -150,19 +150,17 @@ pid Erlang process identifier associated with the queue messages_ready - number of ready messages + number of messages ready to be delivered to clients messages_unacknowledged - number of unacknowledged messages + number of messages delivered to clients but not yet acknowledged messages_uncommitted - number of uncommitted messages - -messages - sum of ready, unacknowledged and uncommitted messages + number of messages published in as yet uncommitted transactions acks_uncommitted - number of uncommitted acknowledgements + number of acknowledgements received in as yet uncommitted + transactions consumers number of consumers diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl deleted file mode 100644 index 344b719a..00000000 --- a/src/buffering_proxy.erl +++ /dev/null @@ -1,108 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(buffering_proxy). - --export([start_link/2]). - -%% internal - --export([mainloop/4, drain/2]). --export([proxy_loop/3]). - --define(HIBERNATE_AFTER, 5000). - -%%---------------------------------------------------------------------------- - -start_link(M, A) -> - spawn_link( - fun () -> process_flag(trap_exit, true), - ProxyPid = self(), - Ref = make_ref(), - Pid = spawn_link( - fun () -> ProxyPid ! Ref, - mainloop(ProxyPid, Ref, M, - M:init(ProxyPid, A)) end), - proxy_loop(Ref, Pid, empty) - end). - -%%---------------------------------------------------------------------------- - -mainloop(ProxyPid, Ref, M, State) -> - NewState = - receive - {Ref, Messages} -> - NewSt = - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)), - ProxyPid ! Ref, - NewSt; - Msg -> M:handle_message(Msg, State) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, - [ProxyPid, Ref, M, State]) - end, - ?MODULE:mainloop(ProxyPid, Ref, M, NewState). - -drain(M, State) -> - receive - Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) - after 0 -> - State - end. - -proxy_loop(Ref, Pid, State) -> - receive - Ref -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> waiting; - waiting -> exit(duplicate_next); - Messages -> Pid ! {Ref, Messages}, empty - end); - {'EXIT', Pid, Reason} -> - exit(Reason); - {'EXIT', _, Reason} -> - exit(Pid, Reason), - ?MODULE:proxy_loop(Ref, Pid, State); - Msg -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> [Msg]; - waiting -> Pid ! {Ref, [Msg]}, empty; - Messages -> [Msg | Messages] - end) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) - end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3b0bf12c..abbdce66 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -43,7 +43,7 @@ -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -199,10 +199,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server:call(QPid, info). + gen_server2:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server:call(QPid, {info, Items}) of + case gen_server2:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -211,45 +211,45 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -258,34 +258,34 @@ notify_down_all(QPids, ChPid) -> %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). limit_all(QPids, ChPid, LimiterPid) -> safe_pmap_ok( fun (_) -> ok end, - fun (QPid) -> gen_server:cast(QPid, {limit, ChPid, LimiterPid}) end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server:cast(QPid, {unblock, ChPid}). + gen_server2:cast(QPid, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 586e05ae..c390b2b7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -86,7 +86,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -514,8 +514,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -566,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -800,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ae502061..fd9bb926 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,18 +33,21 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). +-behaviour(gen_server2). + -export([start_link/4, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). +-define(HIBERNATE_AFTER, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,110 +65,102 @@ %%---------------------------------------------------------------------------- start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). + {ok, Pid} = gen_server2:start_link( + ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + noreply(NewState); {noreply, NewState} -> - NewState; + noreply(NewState); stop -> - exit(normal) + {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + {stop, {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, State}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + noreply(State); -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> ok = clear_permission_cache(), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; - -handle_message({'EXIT', Pid, Reason}, State = #ch{proxy_pid = Pid}) -> - terminate(Reason, State); + noreply(State). -handle_message({'EXIT', _Pid, normal}, State) -> - State; +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); +handle_info(timeout, State) -> + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). - -%%--------------------------------------------------------------------------- +terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + state = terminating}) -> + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid); terminate(Reason, State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> @@ -175,8 +170,14 @@ terminate(Reason, State = #ch{writer_pid = WriterPid, _ -> ok end, rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid), - exit(Reason). + rabbit_limiter:shutdown(LimiterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -281,7 +282,6 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -320,7 +320,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -334,13 +334,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_messaging_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -366,8 +366,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of @@ -387,7 +386,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, LimiterPid, + Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -418,8 +417,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -440,7 +438,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -460,13 +458,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - proxy_pid = ProxyPid }) -> + _, State = #ch{ limiter_pid = LimiterPid }) -> NewLimiterPid = case {LimiterPid, PrefetchCount} of {undefined, 0} -> undefined; {undefined, _} -> - LPid = rabbit_limiter:start_link(ProxyPid), + LPid = rabbit_limiter:start_link(self()), ok = limit_queues(LPid, State), LPid; {_, 0} -> @@ -481,7 +478,6 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -490,14 +486,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -515,8 +510,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -816,10 +810,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -873,11 +867,11 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), ProxyPid). +notify_queues(#ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). -limit_queues(LPid, #ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), ProxyPid, LPid). +limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> [QPid || QueueName <- @@ -921,7 +915,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -933,6 +927,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 855f3ab5..20a66ac5 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -75,6 +75,7 @@ start_link(ChPid) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> + unlink(LimiterPid), gen_server:cast(LimiterPid, shutdown). limit(undefined, 0) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ad653a2f..26d857be 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server:cast( + gen_server2:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids @@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server:call( + try gen_server2:call( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}) catch |