diff options
author | Ben Hood <0x6e6562@gmail.com> | 2009-01-21 13:00:07 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2009-01-21 13:00:07 +0000 |
commit | b8c32eaeef9d1c64d6e6a34e6cf9657a730cdc43 (patch) | |
tree | 516a796e2614d537271255d855fc12623dc2f8a2 | |
parent | 1193c9d00b661c430579e45a7ef9ecd64c0698b7 (diff) | |
parent | b98e158cdef0c5d794e0362b30e7dc529476982d (diff) | |
download | rabbitmq-server-b8c32eaeef9d1c64d6e6a34e6cf9657a730cdc43.tar.gz |
Merged v1_5_1 into default
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | Makefile | 22 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.pod | 3 | ||||
-rw-r--r-- | ebin/rabbit.app | 57 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 20 | ||||
-rw-r--r-- | generate_app | 10 | ||||
-rw-r--r-- | src/buffering_proxy.erl | 108 | ||||
-rw-r--r-- | src/gen_server2.erl | 854 | ||||
-rw-r--r-- | src/rabbit.erl | 32 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 65 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 178 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 246 | ||||
-rw-r--r-- | src/rabbit_error_logger_file_h.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 135 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 195 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 32 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 | ||||
-rw-r--r-- | src/rabbit_sasl_report_file_h.erl | 2 |
21 files changed, 1548 insertions, 432 deletions
@@ -9,6 +9,7 @@ syntax: regexp ^include/rabbit_framing.hrl$ ^src/rabbit_framing.erl$ ^rabbit.plt$ +^ebin/rabbit.app$ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ @@ -7,7 +7,8 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -39,9 +40,15 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app + escript generate_app $(EBIN_DIR) < $< > $@ + +$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< + +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam + erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< +# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -52,12 +59,12 @@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -dialyze: $(TARGETS) +dialyze: $(BEAM_TARGETS) dialyzer -c $? clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -122,8 +129,13 @@ srcdist: distclean >> $(TARGET_SRC_DIR)/INSTALL cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ +<<<<<<< local + >> $(TARGET_SRC_DIR)/BUILD + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in +======= >> $(TARGET_SRC_DIR)/README sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app +>>>>>>> other cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index d2cb0199..fd8918cd 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -157,9 +157,12 @@ messages_unacknowledged messages_uncommitted number of messages published in as yet uncommitted transactions +<<<<<<< local +======= messages sum of ready, unacknowledged and uncommitted messages +>>>>>>> other acks_uncommitted number of acknowledgements received in as yet uncommitted diff --git a/ebin/rabbit.app b/ebin/rabbit.app deleted file mode 100644 index 0d714fdf..00000000 --- a/ebin/rabbit.app +++ /dev/null @@ -1,57 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, [buffering_proxy, - rabbit_access_control, - rabbit_alarm, - rabbit_amqqueue, - rabbit_amqqueue_process, - rabbit_amqqueue_sup, - rabbit_binary_generator, - rabbit_binary_parser, - rabbit_channel, - rabbit_control, - rabbit, - rabbit_error_logger, - rabbit_error_logger_file_h, - rabbit_exchange, - rabbit_framing_channel, - rabbit_framing, - rabbit_heartbeat, - rabbit_load, - rabbit_log, - rabbit_memsup_linux, - rabbit_misc, - rabbit_mnesia, - rabbit_multi, - rabbit_networking, - rabbit_node_monitor, - rabbit_persister, - rabbit_reader, - rabbit_router, - rabbit_sasl_report_file_h, - rabbit_sup, - rabbit_tests, - rabbit_tracer, - rabbit_writer, - tcp_acceptor, - tcp_acceptor_sup, - tcp_client_sup, - tcp_listener, - tcp_listener_sup]}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {memory_alarms, auto}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in new file mode 100644 index 00000000..e2f36c0f --- /dev/null +++ b/ebin/rabbit_app.in @@ -0,0 +1,20 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, []}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app new file mode 100644 index 00000000..62301292 --- /dev/null +++ b/generate_app @@ -0,0 +1,10 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +main([BeamDir]) -> + Modules = [list_to_atom(filename:basename(F, ".beam")) || + F <- filelib:wildcard("*.beam", BeamDir)], + {ok, {application, Application, Properties}} = io:read(''), + NewProperties = lists:keyreplace(modules, 1, Properties, + {modules, Modules}), + io:format("~p.", [{application, Application, NewProperties}]). diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl deleted file mode 100644 index 344b719a..00000000 --- a/src/buffering_proxy.erl +++ /dev/null @@ -1,108 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(buffering_proxy). - --export([start_link/2]). - -%% internal - --export([mainloop/4, drain/2]). --export([proxy_loop/3]). - --define(HIBERNATE_AFTER, 5000). - -%%---------------------------------------------------------------------------- - -start_link(M, A) -> - spawn_link( - fun () -> process_flag(trap_exit, true), - ProxyPid = self(), - Ref = make_ref(), - Pid = spawn_link( - fun () -> ProxyPid ! Ref, - mainloop(ProxyPid, Ref, M, - M:init(ProxyPid, A)) end), - proxy_loop(Ref, Pid, empty) - end). - -%%---------------------------------------------------------------------------- - -mainloop(ProxyPid, Ref, M, State) -> - NewState = - receive - {Ref, Messages} -> - NewSt = - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)), - ProxyPid ! Ref, - NewSt; - Msg -> M:handle_message(Msg, State) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, - [ProxyPid, Ref, M, State]) - end, - ?MODULE:mainloop(ProxyPid, Ref, M, NewState). - -drain(M, State) -> - receive - Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) - after 0 -> - State - end. - -proxy_loop(Ref, Pid, State) -> - receive - Ref -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> waiting; - waiting -> exit(duplicate_next); - Messages -> Pid ! {Ref, Messages}, empty - end); - {'EXIT', Pid, Reason} -> - exit(Reason); - {'EXIT', _, Reason} -> - exit(Pid, Reason), - ?MODULE:proxy_loop(Ref, Pid, State); - Msg -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> [Msg]; - waiting -> Pid ! {Ref, [Msg]}, empty; - Messages -> [Msg | Messages] - end) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) - end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl new file mode 100644 index 00000000..11bb66d7 --- /dev/null +++ b/src/gen_server2.erl @@ -0,0 +1,854 @@ +%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) the module name is gen_server2 +%% +%% 2) more efficient handling of selective receives in callbacks +%% gen_server2 processes drain their message queue into an internal +%% buffer before invoking any callback module functions. Messages are +%% dequeued from the buffer for processing. Thus the effective message +%% queue of a gen_server2 process is the concatenation of the internal +%% buffer and the real message queue. +%% As a result of the draining, any selective receive invoked inside a +%% callback is less likely to have to scan a large message queue. +%% +%% 3) gen_server2:cast is guaranteed to be order-preserving +%% The original code could reorder messages when communicating with a +%% process on a remote node that was not currently connected. +%% +%% All modifications are (C) 2009 LShift Ltd. + +%% ``The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved via the world wide web at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. +%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +%% AB. All Rights Reserved.'' +%% +%% $Id$ +%% +-module(gen_server2). + +%%% --------------------------------------------------- +%%% +%%% The idea behind THIS server is that the user module +%%% provides (different) functions to handle different +%%% kind of inputs. +%%% If the Parent process terminates the Module:terminate/2 +%%% function is called. +%%% +%%% The user module should export: +%%% +%%% init(Args) +%%% ==> {ok, State} +%%% {ok, State, Timeout} +%%% ignore +%%% {stop, Reason} +%%% +%%% handle_call(Msg, {From, Tag}, State) +%%% +%%% ==> {reply, Reply, State} +%%% {reply, Reply, State, Timeout} +%%% {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, Reply, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_cast(Msg, State) +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% terminate(Reason, State) Let the user module clean up +%%% always called when server terminates +%%% +%%% ==> ok +%%% +%%% +%%% The work flow (of the server) can be described as follows: +%%% +%%% User module Generic +%%% ----------- ------- +%%% start -----> start +%%% init <----- . +%%% +%%% loop +%%% handle_call <----- . +%%% -----> reply +%%% +%%% handle_cast <----- . +%%% +%%% handle_info <----- . +%%% +%%% terminate <----- . +%%% +%%% -----> reply +%%% +%%% +%%% --------------------------------------------------- + +%% API +-export([start/3, start/4, + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5]). + +-export([behaviour_info/1]). + +%% System exports +-export([system_continue/3, + system_terminate/4, + system_code_change/4, + format_status/2]). + +%% Internal exports +-export([init_it/6, print_event/3]). + +-import(error_logger, [format/2]). + +%%%========================================================================= +%%% API +%%%========================================================================= + +behaviour_info(callbacks) -> + [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, + {terminate,2},{code_change,3}]; +behaviour_info(_Other) -> + undefined. + +%%% ----------------------------------------------------------------- +%%% Starts a generic server. +%%% start(Mod, Args, Options) +%%% start(Name, Mod, Args, Options) +%%% start_link(Mod, Args, Options) +%%% start_link(Name, Mod, Args, Options) where: +%%% Name ::= {local, atom()} | {global, atom()} +%%% Mod ::= atom(), callback module implementing the 'real' server +%%% Args ::= term(), init arguments (to Mod:init/1) +%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] +%%% Flag ::= trace | log | {logfile, File} | statistics | debug +%%% (debug == log && statistics) +%%% Returns: {ok, Pid} | +%%% {error, {already_started, Pid}} | +%%% {error, Reason} +%%% ----------------------------------------------------------------- +start(Mod, Args, Options) -> + gen:start(?MODULE, nolink, Mod, Args, Options). + +start(Name, Mod, Args, Options) -> + gen:start(?MODULE, nolink, Name, Mod, Args, Options). + +start_link(Mod, Args, Options) -> + gen:start(?MODULE, link, Mod, Args, Options). + +start_link(Name, Mod, Args, Options) -> + gen:start(?MODULE, link, Name, Mod, Args, Options). + + +%% ----------------------------------------------------------------- +%% Make a call to a generic server. +%% If the server is located at another node, that node will +%% be monitored. +%% If the client is trapping exits and is linked server termination +%% is handled here (? Shall we do that here (or rely on timeouts) ?). +%% ----------------------------------------------------------------- +call(Name, Request) -> + case catch gen:call(Name, '$gen_call', Request) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) + end. + +call(Name, Request, Timeout) -> + case catch gen:call(Name, '$gen_call', Request, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) + end. + +%% ----------------------------------------------------------------- +%% Make a cast to a generic server. +%% ----------------------------------------------------------------- +cast({global,Name}, Request) -> + catch global:send(Name, cast_msg(Request)), + ok; +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_atom(Dest) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_pid(Dest) -> + do_cast(Dest, Request). + +do_cast(Dest, Request) -> + do_send(Dest, cast_msg(Request)), + ok. + +cast_msg(Request) -> {'$gen_cast',Request}. + +%% ----------------------------------------------------------------- +%% Send a reply to the client. +%% ----------------------------------------------------------------- +reply({To, Tag}, Reply) -> + catch To ! {Tag, Reply}. + +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n prey +%%----------------------------------------------------------------- +abcast(Name, Request) when is_atom(Name) -> + do_abcast([node() | nodes()], Name, cast_msg(Request)). + +abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> + do_abcast(Nodes, Name, cast_msg(Request)). + +do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> + do_send({Name,Node},Msg), + do_abcast(Nodes, Name, Msg); +do_abcast([], _,_) -> abcast. + +%%% ----------------------------------------------------------------- +%%% Make a call to servers at several nodes. +%%% Returns: {[Replies],[BadNodes]} +%%% A Timeout can be given +%%% +%%% A middleman process is used in case late answers arrives after +%%% the timeout. If they would be allowed to glog the callers message +%%% queue, it would probably become confused. Late answers will +%%% now arrive to the terminated middleman and so be discarded. +%%% ----------------------------------------------------------------- +multi_call(Name, Req) + when is_atom(Name) -> + do_multi_call([node() | nodes()], Name, Req, infinity). + +multi_call(Nodes, Name, Req) + when is_list(Nodes), is_atom(Name) -> + do_multi_call(Nodes, Name, Req, infinity). + +multi_call(Nodes, Name, Req, infinity) -> + do_multi_call(Nodes, Name, Req, infinity); +multi_call(Nodes, Name, Req, Timeout) + when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> + do_multi_call(Nodes, Name, Req, Timeout). + + +%%----------------------------------------------------------------- +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive +%% loop and become a gen_server process. +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the +%% process, including registering a name for it. +%%----------------------------------------------------------------- +enter_loop(Mod, Options, State) -> + enter_loop(Mod, Options, State, self(), infinity). + +enter_loop(Mod, Options, State, ServerName = {_, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity); + +enter_loop(Mod, Options, State, Timeout) -> + enter_loop(Mod, Options, State, self(), Timeout). + +enter_loop(Mod, Options, State, ServerName, Timeout) -> + Name = get_proc_name(ServerName), + Parent = get_parent(), + Debug = debug_options(Name, Options), + Queue = queue:new(), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + +%%%======================================================================== +%%% Gen-callback functions +%%%======================================================================== + +%%% --------------------------------------------------- +%%% Initiate the new process. +%%% Register the name using the Rfunc function +%%% Calls the Mod:init/Args function. +%%% Finally an acknowledge is sent to Parent and the main +%%% loop is entered. +%%% --------------------------------------------------- +init_it(Starter, self, Name, Mod, Args, Options) -> + init_it(Starter, self(), Name, Mod, Args, Options); +init_it(Starter, Parent, Name, Mod, Args, Options) -> + Debug = debug_options(Name, Options), + Queue = queue:new(), + case catch Mod:init(Args) of + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, Queue, Debug); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + {stop, Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) + end. + +%%%======================================================================== +%%% Internal functions +%%%======================================================================== +%%% --------------------------------------------------- +%%% The MAIN loop. +%%% --------------------------------------------------- +loop(Parent, Name, State, Mod, Time, Queue, Debug) -> + receive + Input -> loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue), Debug) + after 0 -> + case queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, Msg); + {empty, Queue1} -> + receive + Input -> + loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue1), Debug) + after Time -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, timeout) + end + end + end. + +process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> + case Msg of + {system, From, Req} -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, Queue]); + {'EXIT', Parent, Reason} -> + terminate(Reason, Name, Msg, Mod, State, Debug); + _Msg when Debug =:= [] -> + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + end. + +%%% --------------------------------------------------- +%%% Send/recive functions +%%% --------------------------------------------------- +do_send(Dest, Msg) -> + catch erlang:send(Dest, Msg). + +do_multi_call(Nodes, Name, Req, infinity) -> + Tag = make_ref(), + Monitors = send_nodes(Nodes, Name, Tag, Req), + rec_nodes(Tag, Monitors, Name, undefined); +do_multi_call(Nodes, Name, Req, Timeout) -> + Tag = make_ref(), + Caller = self(), + Receiver = + spawn( + fun() -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), + Mref = erlang:monitor(process, Receiver), + Receiver ! {self(),Tag}, + receive + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) + end. + +send_nodes(Nodes, Name, Tag, Req) -> + send_nodes(Nodes, Name, Tag, Req, []). + +send_nodes([Node|Tail], Name, Tag, Req, Monitors) + when is_atom(Node) -> + Monitor = start_monitor(Node, Name), + %% Handle non-existing names in rec_nodes. + catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, + send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); +send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> + %% Skip non-atom Node + send_nodes(Tail, Name, Tag, Req, Monitors); +send_nodes([], _Name, _Tag, _Req, Monitors) -> + Monitors. + +%% Against old nodes: +%% If no reply has been delivered within 2 secs. (per node) check that +%% the server really exists and wait for ever for the answer. +%% +%% Against contemporary nodes: +%% Wait for reply, server 'DOWN', or timeout from TimerId. + +rec_nodes(Tag, Nodes, Name, TimerId) -> + rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). + +rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + after Time -> + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end + end; +rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> + case catch erlang:cancel_timer(TimerId) of + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok + end, + {Replies, Badnodes}. + +%% Collect all replies that already have arrived +rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> + {Replies, Badnodes}. + + +%%% --------------------------------------------------- +%%% Monitor functions +%%% --------------------------------------------------- + +start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> + if node() =:= nonode@nohost, Node =/= nonode@nohost -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; + true -> + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end + end. + +%% Cancels a monitor started with Ref=erlang:monitor(_, _). +unmonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + true + after 0 -> + true + end. + +%%% --------------------------------------------------- +%%% Message handling functions +%%% --------------------------------------------------- + +dispatch({'$gen_cast', Msg}, Mod, State) -> + Mod:handle_cast(Msg, State); +dispatch(Info, Mod, State) -> + Mod:handle_info(Info, State). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {reply, Reply, NState, Time1} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, [])), + reply(From, Reply), + exit(R); + Other -> handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {reply, Reply, NState, Time1} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue, Debug) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, + Parent, Name, Msg, Mod, State, Queue, Debug). + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> + case Reply of + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, []); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + end. + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> + case Reply of + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, Debug); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, Debug); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + end. + +reply(Name, {To, Tag}, Reply, State, Debug) -> + reply({To, Tag}, Reply), + sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {out, Reply, To, State} ). + + +%%----------------------------------------------------------------- +%% Callback functions for system messages handling. +%%----------------------------------------------------------------- +system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> + loop(Parent, Name, State, Mod, Time, Queue, Debug). + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> + terminate(Reason, Name, [], Mod, State, Debug). + +system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> + case catch Mod:code_change(OldVsn, State, Extra) of + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; + Else -> Else + end. + +%%----------------------------------------------------------------- +%% Format debug messages. Print them as the call-back module sees +%% them, not as the real erlang messages. Use trace for that. +%%----------------------------------------------------------------- +print_event(Dev, {in, Msg}, Name) -> + case Msg of + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + end; +print_event(Dev, {out, Msg, To, State}, Name) -> + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); +print_event(Dev, {noreply, State}, Name) -> + io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); +print_event(Dev, Event, Name) -> + io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). + + +%%% --------------------------------------------------- +%%% Terminate the server. +%%% --------------------------------------------------- + +terminate(Reason, Name, Msg, Mod, State, Debug) -> + case catch Mod:terminate(Reason, State) of + {'EXIT', R} -> + error_info(R, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + _ -> + error_info(Reason, Name, Msg, State, Debug), + exit(Reason) + end + end. + +error_info(_Reason, application_controller, _Msg, _State, _Debug) -> + %% OTP-5811 Don't send an error report if it's the system process + %% application_controller which is terminating - let init take care + %% of it instead + ok; +error_info(Reason, Name, Msg, State, Debug) -> + Reason1 = + case Reason of + {undef,[{M,F,A}|MFAs]} -> + case code:is_loaded(M) of + false -> + {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> + case erlang:function_exported(M, F, length(A)) of + true -> + Reason; + false -> + {'function not exported',[{M,F,A}|MFAs]} + end + end; + _ -> + Reason + end, + format("** Generic server ~p terminating \n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + [Name, Msg, State, Reason1]), + sys:print_log(Debug), + ok. + +%%% --------------------------------------------------- +%%% Misc. functions. +%%% --------------------------------------------------- + +opt(Op, [{Op, Value}|_]) -> + {ok, Value}; +opt(Op, [_|Options]) -> + opt(Op, Options); +opt(_, []) -> + false. + +debug_options(Name, Opts) -> + case opt(debug, Opts) of + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) + end. + +dbg_options(Name, []) -> + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, + dbg_opts(Name, Opts); +dbg_options(Name, Opts) -> + dbg_opts(Name, Opts). + +dbg_opts(Name, Opts) -> + case catch sys:debug_options(Opts) of + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg + end. + +get_proc_name(Pid) when is_pid(Pid) -> + Pid; +get_proc_name({local, Name}) -> + case process_info(self(), registered_name) of + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; +get_proc_name({global, Name}) -> + case global:safe_whereis_name(Name) of + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) + end. + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid(Parent)-> + Parent; + [Parent | _] when is_atom(Parent)-> + name_to_pid(Parent); + _ -> + exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +%%----------------------------------------------------------------- +%% Status information +%%----------------------------------------------------------------- +format_status(Opt, StatusData) -> + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + StatusData, + NameTag = if is_pid(Name) -> + pid_to_list(Name); + is_atom(Name) -> + Name + end, + Header = lists:concat(["Status for generic server ", NameTag]), + Log = sys:get_debug(log, Debug, []), + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> + case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> + [{data, [{"State", State}]}] + end, + [{header, Header}, + {data, [{"Status", SysState}, + {"Parent", Parent}, + {"Logged events", Log}, + {"Queued messages", queue:to_list(Queue)}]} | + Specfic]. diff --git a/src/rabbit.erl b/src/rabbit.erl index 41064c77..30b8c394 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -75,14 +75,14 @@ start() -> try ok = ensure_working_log_handlers(), ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = start_applications(?APPS) + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) end. stop() -> - ok = stop_applications(?APPS). + ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> spawn(fun () -> @@ -109,34 +109,6 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- -manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> - Iterate(fun (App, Acc) -> - case Do(App) of - ok -> [App | Acc]; - {error, {SkipError, _}} -> Acc; - {error, Reason} -> - lists:foreach(Undo, Acc), - throw({error, {ErrorTag, App, Reason}}) - end - end, [], Apps), - ok. - -start_applications(Apps) -> - manage_applications(fun lists:foldl/3, - fun application:start/1, - fun application:stop/1, - already_started, - cannot_start_application, - Apps). - -stop_applications(Apps) -> - manage_applications(fun lists:foldr/3, - fun application:stop/1, - fun application:start/1, - not_started, - cannot_stop_application, - Apps). - start(normal, []) -> {ok, SupPid} = rabbit_sup:start_link(), diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b73090fc..36270efd 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -186,6 +186,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index dee71d23..875624ba 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -53,7 +53,7 @@ -spec(start/1 :: (bool() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). - + -endif. %%---------------------------------------------------------------------------- @@ -101,7 +101,7 @@ handle_call({register, Pid, HighMemMFA}, end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), {ok, ok, State#alarms{alertees = NewAlertees}}; - + handle_call(_Request, State) -> {ok, not_understood, State}. @@ -135,7 +135,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of + Mod = case os:type() of %% memsup doesn't take account of buffers or cache when %% considering "free" memory - therefore on Linux we can %% get memory alarms very easily without any pressure @@ -143,7 +143,7 @@ start_memsup() -> %% our own simple memory monitor. %% {unix, linux} -> rabbit_memsup_linux; - + %% Start memsup programmatically rather than via the %% rabbitmq-server script. This is not quite the right %% thing to do as os_mon checks to see if memsup is diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb29..abbdce66 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,13 +37,13 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, unblock/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -91,15 +91,17 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -197,10 +199,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server:call(QPid, info). + gen_server2:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server:call(QPid, {info, Items}) of + case gen_server2:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -209,45 +211,45 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -256,25 +258,34 @@ notify_down_all(QPids, ChPid) -> %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). + +unblock(QPid, ChPid) -> + gen_server2:cast(QPid, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6282a8fb..c390b2b7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -62,9 +62,10 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, - is_overload_protection_active, + is_limit_active, unsent_message_count}). -define(INFO_KEYS, @@ -85,7 +86,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -131,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, - unsent_message_count = Count}) -> - {Result, NewActive} = - if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; - true -> - {ok, Active} - end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), - Result. +is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok + end. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of + true -> + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; + false -> + store_ch_record(C#cr{is_limit_active = true}), + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; {empty, _} -> - not_offered + {not_offered, State} end. attempt_delivery(none, Message, State) -> @@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) -> {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} @@ -500,8 +514,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -552,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -586,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -601,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), + if Consumers == [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -622,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers} -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), + if NewConsumers == [] -> + ok = rabbit_limiter:unregister(LimiterPid, self()); + true -> + ok + end, ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, @@ -730,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); + handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + noreply( + possibly_unblock( + State, ChPid, + fun (C = #cr{consumers = Consumers, + limiter_pid = OldLimiterPid, + is_limit_active = Limited}) -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + NewLimited = Limited andalso LimiterPid =/= undefined, + C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -758,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca2782c7..376e39c6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,18 +33,21 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). +-behaviour(gen_server2). + -export([start_link/4, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). +-define(HIBERNATE_AFTER, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,111 +65,118 @@ %%---------------------------------------------------------------------------- start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). + {ok, Pid} = gen_server2:start_link( + ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + noreply(NewState); {noreply, NewState} -> - NewState; + noreply(NewState); stop -> - exit(normal) + {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + {stop, {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, State}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + noreply(State); -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; + noreply(State). -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). +handle_info(timeout, State) -> + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). -%%--------------------------------------------------------------------------- +terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + state = terminating}) -> + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid); -terminate(Reason, State = #ch{writer_pid = WriterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid, + limiter_pid = LimiterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), - exit(Reason). + rabbit_limiter:shutdown(LimiterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -248,7 +258,6 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -273,7 +282,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -286,9 +295,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter_pid, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -299,12 +309,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -330,8 +340,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, + limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -349,7 +359,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -380,8 +390,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -402,7 +411,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -414,13 +423,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS - {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "global=true", []); + +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "prefetch_size!=0 (~w)", [Size]); + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter_pid = LimiterPid }) -> + NewLimiterPid = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> + LPid = rabbit_limiter:start_link(self()), + ok = limit_queues(LPid, State), + LPid; + {_, 0} -> + ok = rabbit_limiter:shutdown(LimiterPid), + ok = limit_queues(undefined, State), + undefined; + {_, _} -> + LimiterPid + end, + ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -429,14 +459,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -454,8 +483,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -744,10 +772,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -762,7 +790,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter_pid, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -799,19 +829,37 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). +notify_queues(#ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). + +limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. + +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) +notify_limiter(undefined, _Acked) -> + ok; +notify_limiter(LimiterPid, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:ack(LimiterPid, Count) + end. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -829,7 +877,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -841,6 +889,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 9a9220b5..183b6984 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -46,7 +46,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " ++ + rabbit_log:error("Failed to append contents of " "log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335c..960e4945 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,6 +88,7 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -145,6 +146,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -211,54 +214,69 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(route), + ExchangeName == Name, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- mnesia:dirty_match_object( #route{binding = #binding{exchange_name = Name, _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); - -route(X = #exchange{type = fanout}, _) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, @@ -377,7 +395,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok @@ -429,6 +447,67 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl new file mode 100644 index 00000000..20a66ac5 --- /dev/null +++ b/src/rabbit_limiter.erl @@ -0,0 +1,195 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_limiter). + +-behaviour(gen_server). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). +-export([start_link/1, shutdown/1]). +-export([limit/2, can_send/2, ack/2, register/2, unregister/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(maybe_pid() :: pid() | 'undefined'). + +-spec(start_link/1 :: (pid()) -> pid()). +-spec(shutdown/1 :: (maybe_pid()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-record(lim, {prefetch_count = 0, + ch_pid, + queues = dict:new(), % QPid -> {MonitorRef, Notify} + volume = 0}). +%% 'Notify' is a boolean that indicates whether a queue should be +%% notified of a change in the limit or volume that may allow it to +%% deliver more messages via the limiter's channel. + +%%---------------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------------- + +start_link(ChPid) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + Pid. + +shutdown(undefined) -> + ok; +shutdown(LimiterPid) -> + unlink(LimiterPid), + gen_server:cast(LimiterPid, shutdown). + +limit(undefined, 0) -> + ok; +limit(LimiterPid, PrefetchCount) -> + gen_server:cast(LimiterPid, {limit, PrefetchCount}). + +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit +can_send(undefined, _QPid) -> + true; +can_send(LimiterPid, QPid) -> + rabbit_misc:with_exit_handler( + fun () -> true end, + fun () -> gen_server:call(LimiterPid, {can_send, QPid}) end). + +%% Let the limiter know that the channel has received some acks from a +%% consumer +ack(undefined, _Count) -> ok; +ack(LimiterPid, Count) -> gen_server:cast(LimiterPid, {ack, Count}). + +register(undefined, _QPid) -> ok; +register(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {register, QPid}). + +unregister(undefined, _QPid) -> ok; +unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([ChPid]) -> + {ok, #lim{ch_pid = ChPid} }. + +handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> + case limit_reached(State) of + true -> {reply, false, limit_queue(QPid, State)}; + false -> {reply, true, State#lim{volume = Volume + 1}} + end. + +handle_cast(shutdown, State) -> + {stop, normal, State}; + +handle_cast({limit, PrefetchCount}, State) -> + {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; + +handle_cast({ack, Count}, State = #lim{volume = Volume}) -> + NewVolume = if Volume == 0 -> 0; + true -> Volume - Count + end, + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + +handle_cast({register, QPid}, State) -> + {noreply, remember_queue(QPid, State)}; + +handle_cast({unregister, QPid}, State) -> + {noreply, forget_queue(QPid, State)}. + +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) -> + {noreply, forget_queue(QPid, State)}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> + State. + +%%---------------------------------------------------------------------------- +%% Internal plumbing +%%---------------------------------------------------------------------------- + +maybe_notify(OldState, NewState) -> + case limit_reached(OldState) andalso not(limit_reached(NewState)) of + true -> notify_queues(NewState); + false -> NewState + end. + +limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> + Limit =/= 0 andalso Volume >= Limit. + +remember_queue(QPid, State = #lim{queues = Queues}) -> + case dict:is_key(QPid, Queues) of + false -> MRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + true -> State + end. + +forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> + case dict:find(QPid, Queues) of + {ok, {MRef, _}} -> + true = erlang:demonitor(MRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = dict:erase(QPid, Queues)}; + error -> State + end. + +limit_queue(QPid, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, _}) -> {MRef, true} end, + State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + +notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + {QList, NewQueues} = + dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], dict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), + case length(QList) of + 0 -> ok; + L -> + %% We randomly vary the position of queues in the list, + %% thus ensuring that each queue has an equal chance of + %% being notified first. + {L1, L2} = lists:split(random:uniform(L), QList), + [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], + ok + end, + State#lim{queues = NewQueues}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 973e163b..85db50d7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,6 +50,7 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). +-export([start_applications/1, stop_applications/1]). -import(mnesia). -import(lists). @@ -108,6 +109,8 @@ -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'true'). +-spec(start_applications/1 :: ([atom()]) -> 'ok'). +-spec(stop_applications/1 :: ([atom()]) -> 'ok'). -endif. @@ -398,3 +401,32 @@ format_stderr(Fmt, Args) -> Port = open_port({fd, 0, 2}, [out]), port_command(Port, io_lib:format(Fmt, Args)), port_close(Port). + +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d19c37cb..eebb38fa 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -243,8 +243,8 @@ init_db(ClusterNodes) -> %% NB: we cannot use rabbit_log here since %% it may not have been started yet error_logger:warning_msg( - "schema integrity check failed: ~p~n" ++ - "moving database to backup location " ++ + "schema integrity check failed: ~p~n" + "moving database to backup location " "and recreating schema from scratch~n", [Reason]), ok = move_db(), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ad653a2f..26d857be 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server:cast( + gen_server2:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids @@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server:call( + try gen_server2:call( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}) catch diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 9e4c9c8a..2a365ce1 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -47,7 +47,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " ++ + rabbit_log:error("Failed to append contents of " "sasl log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, |