diff options
author | David Wragg <david@rabbitmq.com> | 2010-09-19 20:54:12 +0100 |
---|---|---|
committer | David Wragg <david@rabbitmq.com> | 2010-09-19 20:54:12 +0100 |
commit | aa9cfe03ab1ebe14706e872a3a218825b382ad20 (patch) | |
tree | 418993452fc759579445adef145aaf59c0996e27 | |
parent | 8b5d897228b95f79eb5f924679ad554da248c822 (diff) | |
parent | 582ef11941c9ebcb27fb1e1b4f38e30a372e2118 (diff) | |
download | rabbitmq-server-aa9cfe03ab1ebe14706e872a3a218825b382ad20.tar.gz |
Merge bug23255 into default (unresolved dependency on inets)
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | docs/html-to-website-xml.xsl | 2 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
-rwxr-xr-x | scripts/rabbitmqctl | 1 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 89 | ||||
-rw-r--r-- | src/gen_server2.erl | 923 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 36 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 61 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 22 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 68 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 7 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 32 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 14 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 25 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 5 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 7 |
23 files changed, 745 insertions, 605 deletions
@@ -1,5 +1,5 @@ This package, the RabbitMQ server is licensed under the MPL. For the MPL, please see LICENSE-MPL-RabbitMQ. -If you have any questions regarding licensing, please contact us at +If you have any questions regarding licensing, please contact us at info@rabbitmq.com. diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 662dbea0..c325bb5a 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -30,7 +30,7 @@ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. </p> <p> - <a href="manpages.html">See a list of all manual pages</a>. + <a href="../manpages.html">See a list of all manual pages</a>. </p> </xsl:when> <xsl:otherwise> diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 17518ddf..eb0a2a51 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -127,6 +127,9 @@ done rm -rf %{buildroot} %changelog +* Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1 +- New Upstream Release + * Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 7ee60016..9927cfbc 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.1.0-1) lucid; urgency=low + + * New Upstream Release + + -- Marek Majkowski <marek@rabbitmq.com> Tue, 14 Sep 2010 14:20:17 +0100 + rabbitmq-server (2.0.0-1) karmic; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 89e7bb4a..02da0cc6 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -1,7 +1,7 @@ Source: rabbitmq-server Section: net Priority: extra -Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> +Maintainer: RabbitMQ Team <packaging@rabbitmq.com> Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc Standards-Version: 3.8.0 diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 9310752f..8e26663a 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -82,7 +82,7 @@ fi [ -f "${RABBITMQ_SASL_LOGS}" ] && cat "${RABBITMQ_SASL_LOGS}" >> "${RABBITMQ_SASL_LOGS}${RABBITMQ_BACKUP_EXTENSION}" RABBITMQ_START_RABBIT= -[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' +[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 92e5312b..76ce25fd 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -47,4 +47,3 @@ exec erl \ -s rabbit_control \ -nodename $RABBITMQ_NODENAME \ -extra "$@" - diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index aecfb096..d2830a25 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -34,13 +34,15 @@ %% A File Handle Cache %% %% This extends a subset of the functionality of the Erlang file -%% module. +%% module. In the below, we use "file handle" to specifically refer to +%% file handles, and "file descriptor" to refer to descriptors which +%% are not file handles, e.g. sockets. %% %% Some constraints %% 1) This supports one writer, multiple readers per file. Nothing %% else. %% 2) Do not open the same file from different processes. Bad things -%% may happen. +%% may happen, especially for writes. %% 3) Writes are all appends. You cannot write to the middle of a %% file, although you can truncate and then append if you want. %% 4) Although there is a write buffer, there is no read buffer. Feel @@ -49,10 +51,10 @@ %% %% Some benefits %% 1) You do not have to remember to call sync before close -%% 2) Buffering is much more flexible than with plain file module, and -%% you can control when the buffer gets flushed out. This means that -%% you can rely on reads-after-writes working, without having to call -%% the expensive sync. +%% 2) Buffering is much more flexible than with the plain file module, +%% and you can control when the buffer gets flushed out. This means +%% that you can rely on reads-after-writes working, without having to +%% call the expensive sync. %% 3) Unnecessary calls to position and sync get optimised out. %% 4) You can find out what your 'real' offset is, and what your %% 'virtual' offset is (i.e. where the hdl really is, and where it @@ -60,14 +62,19 @@ %% 5) You can find out what the offset was when you last sync'd. %% %% There is also a server component which serves to limit the number -%% of open file handles in a "soft" way - the server will never -%% prevent a client from opening a handle, but may immediately tell it -%% to close the handle. Thus you can set the limit to zero and it will -%% still all work correctly, it is just that effectively no caching -%% will take place. The operation of limiting is as follows: +%% of open file descriptors. This is a hard limit: the server +%% component will ensure that clients do not have more file +%% descriptors open than it's configured to allow. %% -%% On open and close, the client sends messages to the server -%% informing it of opens and closes. This allows the server to keep +%% On open, the client requests permission from the server to open the +%% required number of file handles. The server may ask the client to +%% close other file handles that it has open, or it may queue the +%% request and ask other clients to close file handles they have open +%% in order to satisfy the request. Requests are always satisfied in +%% the order they arrive, even if a latter request (for a small number +%% of file handles) can be satisfied before an earlier request (for a +%% larger number of file handles). On close, the client sends a +%% message to the server. These messages allow the server to keep %% track of the number of open handles. The client also keeps a %% gb_tree which is updated on every use of a file handle, mapping the %% time at which the file handle was last used (timestamp) to the @@ -81,21 +88,38 @@ %% Note that this data can go very out of date, by the client using %% the least recently used handle. %% -%% When the limit is reached, the server calculates the average age of -%% the last reported least recently used file handle of all the -%% clients. It then tells all the clients to close any handles not -%% used for longer than this average, by invoking the callback the -%% client registered. The client should receive this message and pass -%% it into set_maximum_since_use/1. However, it is highly possible -%% this age will be greater than the ages of all the handles the -%% client knows of because the client has used its file handles in the -%% mean time. Thus at this point the client reports to the server the +%% When the limit is exceeded (i.e. the number of open file handles is +%% at the limit and there are pending 'open' requests), the server +%% calculates the average age of the last reported least recently used +%% file handle of all the clients. It then tells all the clients to +%% close any handles not used for longer than this average, by +%% invoking the callback the client registered. The client should +%% receive this message and pass it into +%% set_maximum_since_use/1. However, it is highly possible this age +%% will be greater than the ages of all the handles the client knows +%% of because the client has used its file handles in the mean +%% time. Thus at this point the client reports to the server the %% current timestamp at which its least recently used file handle was %% last used. The server will check two seconds later that either it %% is back under the limit, in which case all is well again, or if %% not, it will calculate a new average age. Its data will be much %% more recent now, and so it is very likely that when this is %% communicated to the clients, the clients will close file handles. +%% (In extreme cases, where it's very likely that all clients have +%% used their open handles since they last sent in an update, which +%% would mean that the average will never cause any file handles to +%% be closed, the server can send out an average age of 0, resulting +%% in all available clients closing all their file handles.) +%% +%% Care is taken to ensure that (a) processes which are blocked +%% waiting for file descriptors to become available are not sent +%% requests to close file handles; and (b) given it is known how many +%% file handles a process has open, when the average age is forced to +%% 0, close messages are only sent to enough processes to release the +%% correct number of file handles and the list of processes is +%% randomly shuffled. This ensures we don't cause processes to +%% needlessly close file handles, and ensures that we don't always +%% make such requests of the same processes. %% %% The advantage of this scheme is that there is only communication %% from the client to the server on open, close, and when in the @@ -103,11 +127,7 @@ %% communication from the client to the server on normal file handle %% operations. This scheme forms a feed-back loop - the server does %% not care which file handles are closed, just that some are, and it -%% checks this repeatedly when over the limit. Given the guarantees of -%% now(), even if there is just one file handle open, a limit of 1, -%% and one client, it is certain that when the client calculates the -%% age of the handle, it will be greater than when the server -%% calculated it, hence it should be closed. +%% checks this repeatedly when over the limit. %% %% Handles which are closed as a result of the server are put into a %% "soft-closed" state in which the handle is closed (data flushed out @@ -117,8 +137,19 @@ %% - reopening them when necessary is handled transparently. %% %% The server also supports obtain and transfer. obtain/0 blocks until -%% a file descriptor is available. transfer/1 is transfers ownership -%% of a file descriptor between processes. It is non-blocking. +%% a file descriptor is available, at which point the requesting +%% process is considered to 'own' one more descriptor. transfer/1 +%% transfers ownership of a file descriptor between processes. It is +%% non-blocking. Obtain is used to obtain permission to accept file +%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1 +%% macro. File handles can use the entire limit, but will be evicted +%% by obtain calls up to the point at which no more obtain calls can +%% be satisfied by the obtains limit. Thus there will always be some +%% capacity available for file handles. Processes that use obtain are +%% never asked to return them, and they are not managed in any way by +%% the server. It is simply a mechanism to ensure that processes that +%% need file descriptors such as sockets can do so in such a way that +%% the overall number of open file descriptors is managed. %% %% The callers of register_callback/3, obtain/0, and the argument of %% transfer/1 are monitored, reducing the count of handles in use diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 9fb9e2fe..b0379b95 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,10 +16,12 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 -%% allow callers to attach priorities to requests. Requests with -%% higher priorities are processed before requests with lower -%% priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/3, +%% prioritise_cast/2 and prioritise_info/2. These functions take +%% Message, From and State or just Message and State and return a +%% single integer representing the priority attached to the message. +%% Messages with higher priorities are processed before requests with +%% lower priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -64,16 +66,16 @@ %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved via the world wide web at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB. %% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings %% AB. All Rights Reserved.'' -%% +%% %% $Id$ %% -module(gen_server2). @@ -82,13 +84,13 @@ %%% %%% The idea behind THIS server is that the user module %%% provides (different) functions to handle different -%%% kind of inputs. +%%% kind of inputs. %%% If the Parent process terminates the Module:terminate/2 %%% function is called. %%% %%% The user module should export: %%% -%%% init(Args) +%%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} @@ -101,21 +103,21 @@ %%% {reply, Reply, State, Timeout} %%% {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, Reply, State} +%%% {stop, Reason, Reply, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_cast(Msg, State) %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% terminate(Reason, State) Let the user module clean up @@ -159,37 +161,41 @@ %% API -export([start/3, start/4, - start_link/3, start_link/4, - call/2, call/3, pcall/3, pcall/4, - cast/2, pcast/3, reply/2, - abcast/2, abcast/3, - multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). -export([behaviour_info/1]). %% System exports -export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). + system_terminate/4, + system_code_change/4, + format_status/2]). %% Internal exports -export([init_it/6, print_event/3]). -import(error_logger, [format/2]). +%% State record +-record(gs2_state, {parent, name, state, mod, time, + timeout_state, queue, debug, prioritise_call, + prioritise_cast, prioritise_info}). + %%%========================================================================= %%% Specs. These exist only to shut up dialyzer's warnings %%%========================================================================= -ifdef(use_specs). --spec(handle_common_termination/6 :: - (any(), any(), any(), atom(), any(), any()) -> no_return()). +-spec(handle_common_termination/3 :: + (any(), atom(), #gs2_state{}) -> no_return()). --spec(hibernate/7 :: - (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). +-spec(hibernate/1 :: (#gs2_state{}) -> no_return()). -endif. @@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) -> %% be monitored. %% If the client is trapping exits and is linked server termination %% is handled here (? Shall we do that here (or rely on timeouts) ?). -%% ----------------------------------------------------------------- +%% ----------------------------------------------------------------- call(Name, Request) -> case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) end. call(Name, Request, Timeout) -> case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) - end. - -pcall(Name, Priority, Request) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) - end. - -pcall(Name, Priority, Request, Timeout) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. %% ----------------------------------------------------------------- @@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) -> cast({global,Name}, Request) -> catch global:send(Name, cast_msg(Request)), ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> do_cast(Dest, Request); cast(Dest, Request) when is_atom(Dest) -> do_cast(Dest, Request); cast(Dest, Request) when is_pid(Dest) -> do_cast(Dest, Request). -do_cast(Dest, Request) -> +do_cast(Dest, Request) -> do_send(Dest, cast_msg(Request)), ok. - -cast_msg(Request) -> {'$gen_cast',Request}. -pcast({global,Name}, Priority, Request) -> - catch global:send(Name, cast_msg(Priority, Request)), - ok; -pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_atom(Dest) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_pid(Dest) -> - do_cast(Dest, Priority, Request). - -do_cast(Dest, Priority, Request) -> - do_send(Dest, cast_msg(Priority, Request)), - ok. - -cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. +cast_msg(Request) -> {'$gen_cast',Request}. %% ----------------------------------------------------------------- %% Send a reply to the client. @@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. reply({To, Tag}, Reply) -> catch To ! {Tag, Reply}. -%% ----------------------------------------------------------------- -%% Asyncronous broadcast, returns nothing, it's just send'n prey -%%----------------------------------------------------------------- +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n pray +%% ----------------------------------------------------------------- abcast(Name, Request) when is_atom(Name) -> do_abcast([node() | nodes()], Name, cast_msg(Request)). @@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast. %%% Make a call to servers at several nodes. %%% Returns: {[Replies],[BadNodes]} %%% A Timeout can be given -%%% +%%% %%% A middleman process is used in case late answers arrives after %%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will +%%% queue, it would probably become confused. Late answers will %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- multi_call(Name, Req) when is_atom(Name) -> do_multi_call([node() | nodes()], Name, Req, infinity). -multi_call(Nodes, Name, Req) +multi_call(Nodes, Name, Req) when is_list(Nodes), is_atom(Name) -> do_multi_call(Nodes, Name, Req, infinity). multi_call(Nodes, Name, Req, infinity) -> do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) +multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive %% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> @@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Debug = debug_options(Name, Options), Queue = priority_queue:new(), Backoff1 = extend_backoff(Backoff), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). + loop(find_prioritisers( + #gs2_state { parent = Parent, name = Name, state = State, + mod = Mod, time = Timeout, timeout_state = Backoff1, + queue = Queue, debug = Debug })). %%%======================================================================== %%% Gen-callback functions @@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), + GS2State = find_prioritisers( + #gs2_state { parent = Parent, + name = Name, + mod = Mod, + queue = Queue, + debug = Debug }), case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); - {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = infinity, + timeout_state = undefined }); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = undefined }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); - {stop, Reason} -> - %% For consistency, we must make sure that the - %% registered name (if any) is unregistered before - %% the parent process is notified about the failure. - %% (Otherwise, the parent process could get - %% an 'already_started' error if it immediately - %% tried starting the process again.) - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = Backoff1 }); + {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + unregister_name(Name0), + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) end. name({local,Name}) -> Name; @@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); -loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> - process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - drain(Queue), Debug). +loop(GS2State = #gs2_state { time = hibernate, + timeout_state = undefined }) -> + pre_hibernate(GS2State); +loop(GS2State) -> + process_next_msg(drain(GS2State)). -drain(Queue) -> +drain(GS2State) -> receive - Input -> drain(in(Input, Queue)) - after 0 -> Queue + Input -> drain(in(Input, GS2State)) + after 0 -> GS2State end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> +process_next_msg(GS2State = #gs2_state { time = Time, + timeout_state = TimeoutState, + queue = Queue }) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Msg); + process_msg(Msg, GS2State #gs2_state { queue = Queue1 }); {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of @@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> Input -> %% Time could be 'hibernate' here, so *don't* call loop process_next_msg( - Parent, Name, State, Mod, Time, TimeoutState, - drain(in(Input, Queue1)), Debug) + drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) after Time1 -> case HibOnTimeout of true -> pre_hibernate( - Parent, Name, State, Mod, TimeoutState, Queue1, - Debug); + GS2State #gs2_state { queue = Queue1 }); false -> - process_msg( - Parent, Name, State, Mod, Time, TimeoutState, - Queue1, Debug, timeout) + process_msg(timeout, + GS2State #gs2_state { queue = Queue1 }) end end end. -wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> +wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> TimeoutState1 = case TS of undefined -> undefined; {SleptAt, TimeoutState} -> adjust_timeout_state(SleptAt, now(), TimeoutState) end, - post_hibernate(Parent, Name, State, Mod, TimeoutState1, - drain(Queue), Debug). + post_hibernate( + drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). -hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> TS = case TimeoutState of undefined -> undefined; {backoff, _, _, _, _} -> {now(), TimeoutState} end, - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, - TS, Queue, Debug]). + proc_lib:hibernate(?MODULE, wake_hib, + [GS2State #gs2_state { timeout_state = TS }]). -pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +pre_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_pre_hibernate, 1) of true -> case catch Mod:handle_pre_hibernate(State) of {hibernate, NState} -> - hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, - Debug); + hibernate(GS2State #gs2_state { state = NState } ); Reply -> - handle_common_termination(Reply, Name, pre_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, pre_hibernate, GS2State) end; false -> - hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + hibernate(GS2State) end. -post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +post_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_post_hibernate, 1) of true -> case catch Mod:handle_post_hibernate(State) of {noreply, NState} -> - process_next_msg(Parent, Name, NState, Mod, infinity, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = infinity }); {noreply, NState, Time} -> - process_next_msg(Parent, Name, NState, Mod, Time, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = Time }); Reply -> - handle_common_termination(Reply, Name, post_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, post_hibernate, GS2State) end; false -> %% use hibernate here, not infinity. This matches @@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> %% still set to hibernate, iff that msg is the very msg %% that woke us up (or the first msg we receive after %% waking up). - process_next_msg(Parent, Name, State, Mod, hibernate, - TimeoutState, Queue, Debug) + process_next_msg(GS2State #gs2_state { time = hibernate }) end. adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, @@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -in({'$gen_pcast', {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_cast', Msg}, Priority, Queue); -in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); -in(Input, Queue) -> - priority_queue:in(Input, Queue). - -process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Msg) -> +in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_cast', Msg}, + PC(Msg, GS2State), Queue) }; +in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_call', From, Msg}, + PC(Msg, From, GS2State), Queue) }; +in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + Input, PI(Input, GS2State), Queue) }. + +process_msg(Msg, + GS2State = #gs2_state { parent = Parent, + name = Name, + debug = Debug }) -> case Msg of - {system, From, Req} -> - sys:handle_system_msg( + {system, From, Req} -> + sys:handle_system_msg( Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, TimeoutState, Queue]); + GS2State); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. - {'EXIT', Parent, Reason} -> - terminate(Reason, Name, Msg, Mod, State, Debug); - _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); - _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, - Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, - Debug1) + {'EXIT', Parent, Reason} -> + terminate(Reason, Msg, GS2State); + _Msg when Debug =:= [] -> + handle_msg(Msg, GS2State); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) end. %%% --------------------------------------------------- @@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Tag = make_ref(), Caller = self(), Receiver = - spawn( - fun () -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The sychronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), + spawn( + fun () -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), Mref = erlang:monitor(process, Receiver), Receiver ! {self(),Tag}, receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) end. send_nodes(Nodes, Name, Tag, Req) -> @@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors) send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> %% Skip non-atom Node send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> +send_nodes([], _Name, _Tag, _Req, Monitors) -> Monitors. %% Against old nodes: @@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) -> %% Against contemporary nodes: %% Wait for reply, server 'DOWN', or timeout from TimerId. -rec_nodes(Tag, Nodes, Name, TimerId) -> +rec_nodes(Tag, Nodes, Name, TimerId) -> rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end end; rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok end, {Replies, Badnodes}. %% Collect all replies that already have arrived rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> {Replies, Badnodes}. @@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end end. %% Cancels a monitor started with Ref=erlang:monitor(_, _). unmonitor(Ref) when is_reference(Ref) -> erlang:demonitor(Ref), receive - {'DOWN', Ref, _, _, _} -> - true + {'DOWN', Ref, _, _, _} -> + true after 0 -> - true + true end. %%% --------------------------------------------------- @@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) -> dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {reply, Reply, NState, Time1} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, [])), - reply(From, Reply), - exit(R); - Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) - end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue). - -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +common_reply(_Name, From, Reply, _NState, [] = _Debug) -> + reply(From, Reply), + []; +common_reply(Name, From, Reply, NState, Debug) -> + reply(Name, From, Reply, NState, Debug). + +common_debug([] = _Debug, _Func, _Info, _Event) -> + []; +common_debug(Debug, Func, Info, Event) -> + sys:handle_debug(Debug, Func, Info, Event). + +handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, + state = State, + name = Name, + debug = Debug }) -> case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {reply, Reply, NState, Time1} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), - reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug) + {reply, Reply, NState} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {reply, Reply, NState, Time1} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1}); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = infinity, + debug = Debug1}); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = Time1, + debug = Debug1}); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Msg, + GS2State #gs2_state { state = NState })), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, Msg, GS2State) end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug). + handle_common_reply(Reply, Msg, GS2State). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) -> +handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, + debug = Debug}) -> case Reply of - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1 }); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, []) + handle_common_termination(Reply, Msg, GS2State) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, - Debug) -> +handle_common_termination(Reply, Msg, GS2State) -> case Reply of - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Msg, GS2State #gs2_state { state = NState }); + {'EXIT', What} -> + terminate(What, Msg, GS2State); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, Debug) - end. - -handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> - case Reply of - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, Debug); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, Debug); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + terminate({bad_return_value, Reply}, Msg, GS2State) end. reply(Name, {To, Tag}, Reply, State, Debug) -> reply({To, Tag}, Reply), - sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {out, Reply, To, State} ). + sys:handle_debug( + Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}). %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> - loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). +system_continue(Parent, Debug, GS2State) -> + loop(GS2State #gs2_state { parent = Parent, debug = Debug }). -ifdef(use_specs). -spec system_terminate(_, _, _, [_]) -> no_return(). -endif. -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, - _TimeoutState, _Queue]) -> - terminate(Reason, Name, [], Mod, State, Debug). +system_terminate(Reason, _Parent, Debug, GS2State) -> + terminate(Reason, [], GS2State #gs2_state { debug = Debug }). -system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, - OldVsn, Extra) -> +system_code_change(GS2State = #gs2_state { mod = Mod, + state = State }, + _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> - {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; - Else -> + {ok, NewState} -> + NewGS2State = find_prioritisers( + GS2State #gs2_state { state = NewState }), + {ok, [NewGS2State]}; + Else -> Else end. @@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, %%----------------------------------------------------------------- print_event(Dev, {in, Msg}, Name) -> case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~p got call ~p from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~p got cast ~p~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) end; print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", - [Name, Msg, To, State]); + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); print_event(Dev, {noreply, State}, Name) -> io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); print_event(Dev, Event, Name) -> @@ -973,23 +950,26 @@ print_event(Dev, Event, Name) -> %%% Terminate the server. %%% --------------------------------------------------- -terminate(Reason, Name, Msg, Mod, State, Debug) -> +terminate(Reason, Msg, #gs2_state { name = Name, + mod = Mod, + state = State, + debug = Debug }) -> case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Reason, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - {shutdown,_}=Shutdown -> - exit(Shutdown); - _ -> - error_info(Reason, undefined, Name, Msg, State, Debug), - exit(Reason) - end + {'EXIT', R} -> + error_info(R, Reason, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); + _ -> + error_info(Reason, undefined, Name, Msg, State, Debug), + exit(Reason) + end end. error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> @@ -1038,74 +1018,109 @@ opt(_, []) -> debug_options(Name, Opts) -> case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) end. dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> - []; - _ -> - [log, statistics] - end, + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, dbg_opts(Name, Opts); dbg_options(Name, Opts) -> dbg_opts(Name, Opts). dbg_opts(Name, Opts) -> case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~p: ignoring erroneous debug options - ~p~n", - [Name, Opts]), - []; - Dbg -> - Dbg + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg end. get_proc_name(Pid) when is_pid(Pid) -> Pid; get_proc_name({local, Name}) -> case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) - end; + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; get_proc_name({global, Name}) -> case global:safe_whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) end. get_parent() -> case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> + [Parent | _] when is_pid(Parent)-> Parent; [Parent | _] when is_atom(Parent)-> name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) + _ -> + exit(process_was_not_started_by_proc_lib) end. name_to_pid(Name) -> case whereis(Name) of - undefined -> - case global:safe_whereis_name(Name) of - undefined -> - exit(could_not_find_registerd_name); - Pid -> - Pid - end; - Pid -> - Pid + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> + PrioriCall = function_exported_or_default( + Mod, 'prioritise_call', 3, + fun (_Msg, _From, _State) -> 0 end), + PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + fun (_Msg, _State) -> 0 end), + PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + fun (_Msg, _State) -> 0 end), + GS2State #gs2_state { prioritise_call = PrioriCall, + prioritise_cast = PrioriCast, + prioritise_info = PrioriInfo }. + +function_exported_or_default(Mod, Fun, Arity, Default) -> + case erlang:function_exported(Mod, Fun, Arity) of + true -> case Arity of + 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end; + 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, From, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end + end; + false -> Default end. %%----------------------------------------------------------------- @@ -1115,25 +1130,23 @@ format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, + pid_to_list(Name); + is_atom(Name) -> + Name + end, Header = lists:concat(["Status for generic server ", NameTag]), Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> - case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> - [{data, [{"State", State}]}] - end, + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> [{data, [{"State", State}]}] + end, [{header, Header}, {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, + {"Parent", Parent}, + {"Logged events", Log}, {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7116653c..3e677c38 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -332,10 +332,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, info, infinity). + delegate_call(QPid, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_pcall(QPid, 9, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -345,7 +345,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, consumers, infinity). + delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -357,7 +357,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> - delegate_pcast(QPid, 7, emit_stats). + delegate_cast(QPid, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -380,10 +380,10 @@ requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> - delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( @@ -419,10 +419,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_pcast(QPid, 7, {notify_sent, ChPid}). + delegate_cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate_pcast(QPid, 7, {unblock, ChPid}). + delegate_cast(QPid, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_no_result( @@ -452,20 +452,19 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, - infinity). + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> - gen_server2:pcast(QPid, 8, update_ram_duration). + gen_server2:cast(QPid, update_ram_duration). set_ram_duration_target(QPid, Duration) -> - gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> - gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(QPid, {set_maximum_since_use, Age}). maybe_expire(QPid) -> - gen_server2:pcast(QPid, 8, maybe_expire). + gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> [Hook() || @@ -505,11 +504,6 @@ safe_delegate_call_ok(F, Pids) -> delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). -delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, - fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_pcast(Pid, Pri, Msg) -> - delegate:invoke_no_result(Pid, - fun (P) -> gen_server2:pcast(P, Pri, Msg) end). +delegate_cast(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08495862..91877efb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,8 @@ -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -import(queue). -import(erlang). @@ -201,7 +202,7 @@ next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS)of + case BQ:needs_idle_timeout(BQS) of true -> {ensure_sync_timer(State2), 0}; false -> {stop_sync_timer(State2), hibernate} end. @@ -589,6 +590,29 @@ emit_stats(State) -> %--------------------------------------------------------------------------- +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + _ -> 0 + end. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb29580f..19150fa9 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -129,8 +129,8 @@ add(Binding, InnerFun) -> E end end) of - {new, Exchange = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(Exchange, B), + {new, X = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(X, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -181,8 +181,8 @@ list(VHostPath) -> [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(ExchangeName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, +list_for_exchange(XName) -> + Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -192,8 +192,8 @@ list_for_queue(QueueName) -> mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(ExchangeName, QueueName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, +list_for_exchange_and_queue(XName, QueueName) -> + Route = #route{binding = #binding{exchange_name = XName, queue_name = QueueName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, @@ -222,14 +222,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). -has_for_exchange(ExchangeName) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, +has_for_exchange(XName) -> + Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(ExchangeName) -> +remove_for_exchange(XName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -237,7 +237,7 @@ remove_for_exchange(ExchangeName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, + #route{binding = #binding{exchange_name = XName, _ = '_'}}, write)]. @@ -249,11 +249,11 @@ remove_transient_for_queue(QueueName) -> %%---------------------------------------------------------------------------- -binding_action(Binding = #binding{exchange_name = ExchangeName, +binding_action(Binding = #binding{exchange_name = XName, queue_name = QueueName, args = Arguments}, Fun) -> call_with_exchange_and_queue( - ExchangeName, QueueName, + XName, QueueName, fun (X, Q) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), Fun(X, Q, Binding#binding{args = SortedArgs}) @@ -270,10 +270,10 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(Exchange, Queue, Fun) -> +call_with_exchange_and_queue(XName, QueueName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of + fun () -> case {mnesia:read({rabbit_exchange, XName}), + mnesia:read({rabbit_queue, QueueName})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; {[_], [ ]} -> {error, queue_not_found}; @@ -327,16 +327,15 @@ remove_for_queue(QueueName, FwdDeleteFun) -> group_bindings_and_auto_delete([], Acc) -> Acc; group_bindings_and_auto_delete( - [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> - group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc). + [B = #binding{exchange_name = XName} | Bs], Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B], Acc). group_bindings_and_auto_delete( - ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], - Bindings, Acc) -> - group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching ExchangeName - [X] = mnesia:read({rabbit_exchange, ExchangeName}), + XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching XName + [X] = mnesia:read({rabbit_exchange, XName}), NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], group_bindings_and_auto_delete(Removed, NewAcc). @@ -359,20 +358,20 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, +reverse_binding(#reverse_binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}) -> - #binding{exchange_name = Exchange, - queue_name = Queue, + #binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}; -reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, +reverse_binding(#binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}) -> - #reverse_binding{exchange_name = Exchange, - queue_name = Queue, + #reverse_binding{exchange_name = XName, + queue_name = QueueName, key = Key, args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 174eab40..f19f98d2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -41,7 +41,8 @@ -export([emit_stats/1, flush/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, @@ -131,10 +132,10 @@ list() -> info_keys() -> ?INFO_KEYS. info(Pid) -> - gen_server2:pcall(Pid, 9, info, infinity). + gen_server2:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + case gen_server2:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -146,7 +147,7 @@ info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). emit_stats(Pid) -> - gen_server2:pcast(Pid, 7, emit_stats). + gen_server2:cast(Pid, emit_stats). flush(Pid) -> gen_server2:call(Pid, flush). @@ -179,6 +180,19 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index d1938805..21c39780 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -53,9 +53,7 @@ start_link() -> supervisor2:start_link(?MODULE, []). start_channel(Pid, Args) -> - {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]), - link(ChSupPid), - Result. + supervisor2:start_child(Pid, [Args]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 40bee25f..2a19d5b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -92,15 +92,15 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). recover() -> - Exs = rabbit_misc:table_fold( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - [Exchange | Acc] - end, [], rabbit_durable_exchange), + Xs = rabbit_misc:table_fold( + fun (X, Acc) -> + ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc] + end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( lists:keysort(#binding.exchange_name, Bs), - lists:keysort(#exchange.name, Exs), []). + lists:keysort(#exchange.name, Xs), []). recover_with_bindings([B = #binding{exchange_name = Name} | Rest], Xs = [#exchange{name = Name} | _], @@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args}, +declare(XName, Type, Durable, AutoDelete, Args) -> + X = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. TypeModule = type_to_module(Type), - ok = TypeModule:validate(Exchange), + ok = TypeModule:validate(X), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of + case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(rabbit_exchange, X, write), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, - Exchange, write); + X, write); false -> ok end, - {new, Exchange}; + {new, X}; [ExistingX] -> {existing, ExistingX} end @@ -257,20 +257,20 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. -call_with_exchange(Exchange, Fun) -> +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end end). -delete(ExchangeName, IfUnused) -> +delete(XName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - case call_with_exchange(ExchangeName, Fun) of + case call_with_exchange(XName, Fun) of {deleted, X = #exchange{type = Type}, Bs} -> (type_to_module(Type)):delete(X, Bs), ok; @@ -280,21 +280,21 @@ delete(ExchangeName, IfUnused) -> maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> - case conditional_delete(Exchange) of - {error, in_use} -> not_deleted; - {deleted, Exchange, []} -> auto_deleted +maybe_auto_delete(#exchange{auto_delete = true} = X) -> + case conditional_delete(X) of + {error, in_use} -> not_deleted; + {deleted, X, []} -> auto_deleted end. -conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case rabbit_binding:has_for_exchange(ExchangeName) of - false -> unconditional_delete(Exchange); +conditional_delete(X = #exchange{name = XName}) -> + case rabbit_binding:has_for_exchange(XName) of + false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = rabbit_binding:remove_for_exchange(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), - {deleted, Exchange, Bindings}. +unconditional_delete(X = #exchange{name = XName}) -> + Bindings = rabbit_binding:remove_for_exchange(XName), + ok = mnesia:delete({rabbit_durable_exchange, XName}), + ok = mnesia:delete({rabbit_exchange, XName}), + rabbit_event:notify(exchange_deleted, [{name, XName}]), + {deleted, X, Bindings}. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index da7078f1..c323d7ce 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). + handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -107,7 +107,7 @@ get_limit(undefined) -> get_limit(Pid) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + fun () -> gen_server2:call(Pid, get_limit, infinity) end). block(undefined) -> ok; @@ -126,6 +126,9 @@ unblock(LimiterPid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +prioritise_call(get_limit, _From, _State) -> 9; +prioritise_call(_Msg, _From, _State) -> 0. + handle_call({can_send, _QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, State}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 18810833..bbecbfe2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -40,7 +40,7 @@ -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -322,8 +322,8 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:pcall( - Server, 2, {read, Guid}, infinity), + Defer = fun() -> {gen_server2:call( + Server, {read, Guid}, infinity), CState} end, case index_lookup(Guid, CState) of not_found -> Defer(); @@ -345,18 +345,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal +sync(Server) -> gen_server2:cast(Server, sync). %% internal gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). + gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). set_maximum_since_use(Server, Age) -> - gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Server, {set_maximum_since_use, Age}). client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -376,7 +376,7 @@ client_delete_and_terminate(CState, Server, Ref) -> ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> - gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). + gen_server2:call(Server, successfully_recovered_state, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -575,6 +575,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + {new_client_state, _Ref} -> 7; + successfully_recovered_state -> 7; + {read, _Guid} -> 2; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + sync -> 8; + {gc_done, _Reclaimed, _Source, _Destination} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 + end. + handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), noreply(State1); diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index c7948b7e..a7855bbf 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). -record(gcstate, {dir, @@ -81,7 +81,7 @@ stop(Server) -> gen_server2:call(Server, stop, infinity). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- @@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index c7a5a600..5cfd6a5c 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -310,8 +310,8 @@ kill_wait(Pid, TimeLeft, Forceful) -> is_dead(Pid) -> PidS = integer_to_list(Pid), with_os([{unix, fun () -> - Res = os:cmd("ps --no-headers --pid " ++ PidS), - Res == "" + system("kill -0 " ++ PidS + ++ " >/dev/null 2>&1") /= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -322,6 +322,16 @@ is_dead(Pid) -> end end}]). +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + call_all_nodes(Func) -> case read_pids_file() of [] -> throw(no_nodes_running); diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 08272afe..6dbd54d2 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -46,6 +46,8 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-include_lib("ssl/src/ssl_record.hrl"). + -define(RABBIT_TCP_OPTS, [ binary, @@ -108,6 +110,8 @@ boot_ssl() -> {ok, SslListeners} -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(ssl_options), + % unknown_ca errors are silently ignored prior to R14B unless we + % supply this verify_fun - remove when at least R14B is required SslOpts = case proplists:get_value(verify, SslOptsConfig, verify_none) of verify_none -> SslOptsConfig; @@ -116,7 +120,26 @@ boot_ssl() -> end} | SslOptsConfig] end, - [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + % In R13B04 and R14A (at least), rc4 is incorrectly implemented. + CipherSuites = proplists:get_value(ciphers, + SslOpts, + ssl:cipher_suites()), + FilteredCipherSuites = + [C || C <- CipherSuites, + begin + SuiteCode = + if is_tuple(C) -> ssl_cipher:suite(C); + is_list(C) -> ssl_cipher:openssl_suite(C) + end, + SP = ssl_cipher:security_parameters( + SuiteCode, + #security_parameters{}), + SP#security_parameters.bulk_cipher_algorithm =/= ?RC4 + end], + SslOpts1 = [{ciphers, FilteredCipherSuites} + | [{K, V} || {K, V} <- SslOpts, K =/= ciphers]], + [start_ssl_listener(Host, Port, SslOpts1) + || {Host, Port} <- SslListeners], ok end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a21961b5..252f81a3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -339,7 +339,7 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw(E); {channel_exit, ChannelOrFrPid, Reason} -> mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'EXIT', ChSupPid, Reason} -> + {'DOWN', _MRef, process, ChSupPid, Reason} -> mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; @@ -489,7 +489,7 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', ChSupPid, Reason} -> + {'DOWN', _MRef, process, ChSupPid, Reason} -> case channel_cleanup(ChSupPid) of undefined -> exit({abnormal_dependent_exit, ChSupPid, Reason}); @@ -867,6 +867,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), + erlang:monitor(process, ChSupPid), put({channel, Channel}, {ch_fr_pid, ChFrPid}), put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), put({ch_fr_pid, ChFrPid}, {channel, Channel}), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bfccb0da..bd57f737 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -85,10 +85,10 @@ deliver(QPids, Delivery) -> %% TODO: This causes a full scan for each entry with the same exchange match_bindings(Name, Match) -> Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, + exchange_name = XName, queue_name = QName}} <- mnesia:table(rabbit_route), - ExchangeName == Name, + XName == Name, Match(Binding)]), lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 42049d50..f461a539 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -71,7 +71,7 @@ submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). run({M, F, A}) -> apply(M, F, A); @@ -88,6 +88,9 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), |