diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-05-18 17:34:25 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-05-18 17:34:25 +0100 |
commit | 0f45dfdeb7dc41be14e57e6a4dd6490728349ea2 (patch) | |
tree | eddc87d08b0c86b4658dfb31ab466be809b8b9e8 /src | |
parent | 17c60aec273b88d3f36d3f63a659025ea687a961 (diff) | |
parent | 3f3b658532cc348bd932ae358bfb2e8b66bdec2c (diff) | |
download | rabbitmq-server-0f45dfdeb7dc41be14e57e6a4dd6490728349ea2.tar.gz |
merge v1_5 into default
Diffstat (limited to 'src')
-rw-r--r-- | src/buffering_proxy.erl | 108 | ||||
-rw-r--r-- | src/gen_server2.erl | 898 | ||||
-rw-r--r-- | src/priority_queue.erl | 153 | ||||
-rw-r--r-- | src/rabbit.erl | 42 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 189 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 96 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 178 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 310 | ||||
-rw-r--r-- | src/rabbit_control.erl | 93 | ||||
-rw-r--r-- | src/rabbit_error_logger_file_h.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 234 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 6 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 3 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 195 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 36 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 75 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 8 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 54 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 32 | ||||
-rw-r--r-- | src/rabbit_router.erl | 13 | ||||
-rw-r--r-- | src/rabbit_sasl_report_file_h.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 88 |
23 files changed, 2208 insertions, 618 deletions
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl deleted file mode 100644 index 344b719a..00000000 --- a/src/buffering_proxy.erl +++ /dev/null @@ -1,108 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(buffering_proxy). - --export([start_link/2]). - -%% internal - --export([mainloop/4, drain/2]). --export([proxy_loop/3]). - --define(HIBERNATE_AFTER, 5000). - -%%---------------------------------------------------------------------------- - -start_link(M, A) -> - spawn_link( - fun () -> process_flag(trap_exit, true), - ProxyPid = self(), - Ref = make_ref(), - Pid = spawn_link( - fun () -> ProxyPid ! Ref, - mainloop(ProxyPid, Ref, M, - M:init(ProxyPid, A)) end), - proxy_loop(Ref, Pid, empty) - end). - -%%---------------------------------------------------------------------------- - -mainloop(ProxyPid, Ref, M, State) -> - NewState = - receive - {Ref, Messages} -> - NewSt = - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)), - ProxyPid ! Ref, - NewSt; - Msg -> M:handle_message(Msg, State) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, - [ProxyPid, Ref, M, State]) - end, - ?MODULE:mainloop(ProxyPid, Ref, M, NewState). - -drain(M, State) -> - receive - Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) - after 0 -> - State - end. - -proxy_loop(Ref, Pid, State) -> - receive - Ref -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> waiting; - waiting -> exit(duplicate_next); - Messages -> Pid ! {Ref, Messages}, empty - end); - {'EXIT', Pid, Reason} -> - exit(Reason); - {'EXIT', _, Reason} -> - exit(Pid, Reason), - ?MODULE:proxy_loop(Ref, Pid, State); - Msg -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> [Msg]; - waiting -> Pid ! {Ref, [Msg]}, empty; - Messages -> [Msg | Messages] - end) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) - end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl new file mode 100644 index 00000000..ba8becfc --- /dev/null +++ b/src/gen_server2.erl @@ -0,0 +1,898 @@ +%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) the module name is gen_server2 +%% +%% 2) more efficient handling of selective receives in callbacks +%% gen_server2 processes drain their message queue into an internal +%% buffer before invoking any callback module functions. Messages are +%% dequeued from the buffer for processing. Thus the effective message +%% queue of a gen_server2 process is the concatenation of the internal +%% buffer and the real message queue. +%% As a result of the draining, any selective receive invoked inside a +%% callback is less likely to have to scan a large message queue. +%% +%% 3) gen_server2:cast is guaranteed to be order-preserving +%% The original code could reorder messages when communicating with a +%% process on a remote node that was not currently connected. +%% +%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 +%% allow callers to attach priorities to requests. Requests with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. +%% +%% All modifications are (C) 2009 LShift Ltd. + +%% ``The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved via the world wide web at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. +%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +%% AB. All Rights Reserved.'' +%% +%% $Id$ +%% +-module(gen_server2). + +%%% --------------------------------------------------- +%%% +%%% The idea behind THIS server is that the user module +%%% provides (different) functions to handle different +%%% kind of inputs. +%%% If the Parent process terminates the Module:terminate/2 +%%% function is called. +%%% +%%% The user module should export: +%%% +%%% init(Args) +%%% ==> {ok, State} +%%% {ok, State, Timeout} +%%% ignore +%%% {stop, Reason} +%%% +%%% handle_call(Msg, {From, Tag}, State) +%%% +%%% ==> {reply, Reply, State} +%%% {reply, Reply, State, Timeout} +%%% {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, Reply, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_cast(Msg, State) +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% terminate(Reason, State) Let the user module clean up +%%% always called when server terminates +%%% +%%% ==> ok +%%% +%%% +%%% The work flow (of the server) can be described as follows: +%%% +%%% User module Generic +%%% ----------- ------- +%%% start -----> start +%%% init <----- . +%%% +%%% loop +%%% handle_call <----- . +%%% -----> reply +%%% +%%% handle_cast <----- . +%%% +%%% handle_info <----- . +%%% +%%% terminate <----- . +%%% +%%% -----> reply +%%% +%%% +%%% --------------------------------------------------- + +%% API +-export([start/3, start/4, + start_link/3, start_link/4, + call/2, call/3, pcall/3, pcall/4, + cast/2, pcast/3, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5]). + +-export([behaviour_info/1]). + +%% System exports +-export([system_continue/3, + system_terminate/4, + system_code_change/4, + format_status/2]). + +%% Internal exports +-export([init_it/6, print_event/3]). + +-import(error_logger, [format/2]). + +%%%========================================================================= +%%% API +%%%========================================================================= + +behaviour_info(callbacks) -> + [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, + {terminate,2},{code_change,3}]; +behaviour_info(_Other) -> + undefined. + +%%% ----------------------------------------------------------------- +%%% Starts a generic server. +%%% start(Mod, Args, Options) +%%% start(Name, Mod, Args, Options) +%%% start_link(Mod, Args, Options) +%%% start_link(Name, Mod, Args, Options) where: +%%% Name ::= {local, atom()} | {global, atom()} +%%% Mod ::= atom(), callback module implementing the 'real' server +%%% Args ::= term(), init arguments (to Mod:init/1) +%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] +%%% Flag ::= trace | log | {logfile, File} | statistics | debug +%%% (debug == log && statistics) +%%% Returns: {ok, Pid} | +%%% {error, {already_started, Pid}} | +%%% {error, Reason} +%%% ----------------------------------------------------------------- +start(Mod, Args, Options) -> + gen:start(?MODULE, nolink, Mod, Args, Options). + +start(Name, Mod, Args, Options) -> + gen:start(?MODULE, nolink, Name, Mod, Args, Options). + +start_link(Mod, Args, Options) -> + gen:start(?MODULE, link, Mod, Args, Options). + +start_link(Name, Mod, Args, Options) -> + gen:start(?MODULE, link, Name, Mod, Args, Options). + + +%% ----------------------------------------------------------------- +%% Make a call to a generic server. +%% If the server is located at another node, that node will +%% be monitored. +%% If the client is trapping exits and is linked server termination +%% is handled here (? Shall we do that here (or rely on timeouts) ?). +%% ----------------------------------------------------------------- +call(Name, Request) -> + case catch gen:call(Name, '$gen_call', Request) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) + end. + +call(Name, Request, Timeout) -> + case catch gen:call(Name, '$gen_call', Request, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) + end. + +pcall(Name, Priority, Request) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) + end. + +pcall(Name, Priority, Request, Timeout) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + end. + +%% ----------------------------------------------------------------- +%% Make a cast to a generic server. +%% ----------------------------------------------------------------- +cast({global,Name}, Request) -> + catch global:send(Name, cast_msg(Request)), + ok; +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_atom(Dest) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_pid(Dest) -> + do_cast(Dest, Request). + +do_cast(Dest, Request) -> + do_send(Dest, cast_msg(Request)), + ok. + +cast_msg(Request) -> {'$gen_cast',Request}. + +pcast({global,Name}, Priority, Request) -> + catch global:send(Name, cast_msg(Priority, Request)), + ok; +pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_atom(Dest) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_pid(Dest) -> + do_cast(Dest, Priority, Request). + +do_cast(Dest, Priority, Request) -> + do_send(Dest, cast_msg(Priority, Request)), + ok. + +cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. + +%% ----------------------------------------------------------------- +%% Send a reply to the client. +%% ----------------------------------------------------------------- +reply({To, Tag}, Reply) -> + catch To ! {Tag, Reply}. + +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n prey +%%----------------------------------------------------------------- +abcast(Name, Request) when is_atom(Name) -> + do_abcast([node() | nodes()], Name, cast_msg(Request)). + +abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> + do_abcast(Nodes, Name, cast_msg(Request)). + +do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> + do_send({Name,Node},Msg), + do_abcast(Nodes, Name, Msg); +do_abcast([], _,_) -> abcast. + +%%% ----------------------------------------------------------------- +%%% Make a call to servers at several nodes. +%%% Returns: {[Replies],[BadNodes]} +%%% A Timeout can be given +%%% +%%% A middleman process is used in case late answers arrives after +%%% the timeout. If they would be allowed to glog the callers message +%%% queue, it would probably become confused. Late answers will +%%% now arrive to the terminated middleman and so be discarded. +%%% ----------------------------------------------------------------- +multi_call(Name, Req) + when is_atom(Name) -> + do_multi_call([node() | nodes()], Name, Req, infinity). + +multi_call(Nodes, Name, Req) + when is_list(Nodes), is_atom(Name) -> + do_multi_call(Nodes, Name, Req, infinity). + +multi_call(Nodes, Name, Req, infinity) -> + do_multi_call(Nodes, Name, Req, infinity); +multi_call(Nodes, Name, Req, Timeout) + when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> + do_multi_call(Nodes, Name, Req, Timeout). + + +%%----------------------------------------------------------------- +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive +%% loop and become a gen_server process. +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the +%% process, including registering a name for it. +%%----------------------------------------------------------------- +enter_loop(Mod, Options, State) -> + enter_loop(Mod, Options, State, self(), infinity). + +enter_loop(Mod, Options, State, ServerName = {_, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity); + +enter_loop(Mod, Options, State, Timeout) -> + enter_loop(Mod, Options, State, self(), Timeout). + +enter_loop(Mod, Options, State, ServerName, Timeout) -> + Name = get_proc_name(ServerName), + Parent = get_parent(), + Debug = debug_options(Name, Options), + Queue = priority_queue:new(), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + +%%%======================================================================== +%%% Gen-callback functions +%%%======================================================================== + +%%% --------------------------------------------------- +%%% Initiate the new process. +%%% Register the name using the Rfunc function +%%% Calls the Mod:init/Args function. +%%% Finally an acknowledge is sent to Parent and the main +%%% loop is entered. +%%% --------------------------------------------------- +init_it(Starter, self, Name, Mod, Args, Options) -> + init_it(Starter, self(), Name, Mod, Args, Options); +init_it(Starter, Parent, Name, Mod, Args, Options) -> + Debug = debug_options(Name, Options), + Queue = priority_queue:new(), + case catch Mod:init(Args) of + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, Queue, Debug); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + {stop, Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) + end. + +%%%======================================================================== +%%% Internal functions +%%%======================================================================== +%%% --------------------------------------------------- +%%% The MAIN loop. +%%% --------------------------------------------------- +loop(Parent, Name, State, Mod, Time, Queue, Debug) -> + receive + Input -> loop(Parent, Name, State, Mod, + Time, in(Input, Queue), Debug) + after 0 -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, Msg); + {empty, Queue1} -> + receive + Input -> + loop(Parent, Name, State, Mod, + Time, in(Input, Queue1), Debug) + after Time -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, timeout) + end + end + end. + +in({'$gen_pcast', {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_cast', Msg}, Priority, Queue); +in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); +in(Input, Queue) -> + priority_queue:in(Input, Queue). + +process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> + case Msg of + {system, From, Req} -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, Queue]); + {'EXIT', Parent, Reason} -> + terminate(Reason, Name, Msg, Mod, State, Debug); + _Msg when Debug =:= [] -> + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + end. + +%%% --------------------------------------------------- +%%% Send/recive functions +%%% --------------------------------------------------- +do_send(Dest, Msg) -> + catch erlang:send(Dest, Msg). + +do_multi_call(Nodes, Name, Req, infinity) -> + Tag = make_ref(), + Monitors = send_nodes(Nodes, Name, Tag, Req), + rec_nodes(Tag, Monitors, Name, undefined); +do_multi_call(Nodes, Name, Req, Timeout) -> + Tag = make_ref(), + Caller = self(), + Receiver = + spawn( + fun() -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), + Mref = erlang:monitor(process, Receiver), + Receiver ! {self(),Tag}, + receive + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) + end. + +send_nodes(Nodes, Name, Tag, Req) -> + send_nodes(Nodes, Name, Tag, Req, []). + +send_nodes([Node|Tail], Name, Tag, Req, Monitors) + when is_atom(Node) -> + Monitor = start_monitor(Node, Name), + %% Handle non-existing names in rec_nodes. + catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, + send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); +send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> + %% Skip non-atom Node + send_nodes(Tail, Name, Tag, Req, Monitors); +send_nodes([], _Name, _Tag, _Req, Monitors) -> + Monitors. + +%% Against old nodes: +%% If no reply has been delivered within 2 secs. (per node) check that +%% the server really exists and wait for ever for the answer. +%% +%% Against contemporary nodes: +%% Wait for reply, server 'DOWN', or timeout from TimerId. + +rec_nodes(Tag, Nodes, Name, TimerId) -> + rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). + +rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + after Time -> + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end + end; +rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> + case catch erlang:cancel_timer(TimerId) of + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok + end, + {Replies, Badnodes}. + +%% Collect all replies that already have arrived +rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> + {Replies, Badnodes}. + + +%%% --------------------------------------------------- +%%% Monitor functions +%%% --------------------------------------------------- + +start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> + if node() =:= nonode@nohost, Node =/= nonode@nohost -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; + true -> + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end + end. + +%% Cancels a monitor started with Ref=erlang:monitor(_, _). +unmonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + true + after 0 -> + true + end. + +%%% --------------------------------------------------- +%%% Message handling functions +%%% --------------------------------------------------- + +dispatch({'$gen_cast', Msg}, Mod, State) -> + Mod:handle_cast(Msg, State); +dispatch(Info, Mod, State) -> + Mod:handle_info(Info, State). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {reply, Reply, NState, Time1} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, [])), + reply(From, Reply), + exit(R); + Other -> handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {reply, Reply, NState, Time1} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue, Debug) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, + Parent, Name, Msg, Mod, State, Queue, Debug). + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> + case Reply of + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, []); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + end. + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> + case Reply of + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, Debug); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, Debug); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + end. + +reply(Name, {To, Tag}, Reply, State, Debug) -> + reply({To, Tag}, Reply), + sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {out, Reply, To, State} ). + + +%%----------------------------------------------------------------- +%% Callback functions for system messages handling. +%%----------------------------------------------------------------- +system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> + loop(Parent, Name, State, Mod, Time, Queue, Debug). + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> + terminate(Reason, Name, [], Mod, State, Debug). + +system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> + case catch Mod:code_change(OldVsn, State, Extra) of + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; + Else -> Else + end. + +%%----------------------------------------------------------------- +%% Format debug messages. Print them as the call-back module sees +%% them, not as the real erlang messages. Use trace for that. +%%----------------------------------------------------------------- +print_event(Dev, {in, Msg}, Name) -> + case Msg of + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + end; +print_event(Dev, {out, Msg, To, State}, Name) -> + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); +print_event(Dev, {noreply, State}, Name) -> + io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); +print_event(Dev, Event, Name) -> + io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). + + +%%% --------------------------------------------------- +%%% Terminate the server. +%%% --------------------------------------------------- + +terminate(Reason, Name, Msg, Mod, State, Debug) -> + case catch Mod:terminate(Reason, State) of + {'EXIT', R} -> + error_info(R, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + _ -> + error_info(Reason, Name, Msg, State, Debug), + exit(Reason) + end + end. + +error_info(_Reason, application_controller, _Msg, _State, _Debug) -> + %% OTP-5811 Don't send an error report if it's the system process + %% application_controller which is terminating - let init take care + %% of it instead + ok; +error_info(Reason, Name, Msg, State, Debug) -> + Reason1 = + case Reason of + {undef,[{M,F,A}|MFAs]} -> + case code:is_loaded(M) of + false -> + {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> + case erlang:function_exported(M, F, length(A)) of + true -> + Reason; + false -> + {'function not exported',[{M,F,A}|MFAs]} + end + end; + _ -> + Reason + end, + format("** Generic server ~p terminating \n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + [Name, Msg, State, Reason1]), + sys:print_log(Debug), + ok. + +%%% --------------------------------------------------- +%%% Misc. functions. +%%% --------------------------------------------------- + +opt(Op, [{Op, Value}|_]) -> + {ok, Value}; +opt(Op, [_|Options]) -> + opt(Op, Options); +opt(_, []) -> + false. + +debug_options(Name, Opts) -> + case opt(debug, Opts) of + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) + end. + +dbg_options(Name, []) -> + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, + dbg_opts(Name, Opts); +dbg_options(Name, Opts) -> + dbg_opts(Name, Opts). + +dbg_opts(Name, Opts) -> + case catch sys:debug_options(Opts) of + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg + end. + +get_proc_name(Pid) when is_pid(Pid) -> + Pid; +get_proc_name({local, Name}) -> + case process_info(self(), registered_name) of + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; +get_proc_name({global, Name}) -> + case global:safe_whereis_name(Name) of + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) + end. + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid(Parent)-> + Parent; + [Parent | _] when is_atom(Parent)-> + name_to_pid(Parent); + _ -> + exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +%%----------------------------------------------------------------- +%% Status information +%%----------------------------------------------------------------- +format_status(Opt, StatusData) -> + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + StatusData, + NameTag = if is_pid(Name) -> + pid_to_list(Name); + is_atom(Name) -> + Name + end, + Header = lists:concat(["Status for generic server ", NameTag]), + Log = sys:get_debug(log, Debug, []), + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> + case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> + [{data, [{"State", State}]}] + end, + [{header, Header}, + {data, [{"Status", SysState}, + {"Parent", Parent}, + {"Logged events", Log}, + {"Queued messages", priority_queue:to_list(Queue)}]} | + Specfic]. diff --git a/src/priority_queue.erl b/src/priority_queue.erl new file mode 100644 index 00000000..732757c4 --- /dev/null +++ b/src/priority_queue.erl @@ -0,0 +1,153 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% Priority queues have essentially the same interface as ordinary +%% queues, except that a) there is an in/3 that takes a priority, and +%% b) we have only implemented the core API we need. +%% +%% Priorities should be integers - the higher the value the higher the +%% priority - but we don't actually check that. +%% +%% in/2 inserts items with priority 0. +%% +%% We optimise the case where a priority queue is being used just like +%% an ordinary queue. When that is the case we represent the priority +%% queue as an ordinary queue. We could just call into the 'queue' +%% module for that, but for efficiency we implement the relevant +%% functions directly in here, thus saving on inter-module calls and +%% eliminating a level of boxing. +%% +%% When the queue contains items with non-zero priorities, it is +%% represented as a sorted kv list with the inverted Priority as the +%% key and an ordinary queue as the value. Here again we use our own +%% ordinary queue implemention for efficiency, often making recursive +%% calls into the same function knowing that ordinary queues represent +%% a base case. + + +-module(priority_queue). + +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(priority() :: integer()). +-type(squeue() :: {queue, [any()], [any()]}). +-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). + +-spec(new/0 :: () -> pqueue()). +-spec(is_queue/1 :: (any()) -> bool()). +-spec(is_empty/1 :: (pqueue()) -> bool()). +-spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(in/2 :: (any(), pqueue()) -> pqueue()). +-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). +-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> + {queue, [], []}. + +is_queue({queue, R, F}) when is_list(R), is_list(F) -> + true; +is_queue({pqueue, Queues}) when is_list(Queues) -> + lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, + Queues); +is_queue(_) -> + false. + +is_empty({queue, [], []}) -> + true; +is_empty(_) -> + false. + +len({queue, R, F}) when is_list(R), is_list(F) -> + length(R) + length(F); +len({pqueue, Queues}) -> + lists:sum([len(Q) || {_, Q} <- Queues]). + +to_list({queue, In, Out}) when is_list(In), is_list(Out) -> + [{0, V} || V <- Out ++ lists:reverse(In, [])]; +to_list({pqueue, Queues}) -> + [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + +in(Item, Q) -> + in(Item, 0, Q). + +in(X, 0, {queue, [_] = In, []}) -> + {queue, [X], In}; +in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out}; +in(X, Priority, _Q = {queue, [], []}) -> + in(X, Priority, {pqueue, []}); +in(X, Priority, Q = {queue, _, _}) -> + in(X, Priority, {pqueue, [{0, Q}]}); +in(X, Priority, {pqueue, Queues}) -> + P = -Priority, + {pqueue, case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end}. + +out({queue, [], []} = Q) -> + {empty, Q}; +out({queue, [V], []}) -> + {{value, V}, {queue, [], []}}; +out({queue, [Y|In], []}) -> + [V|Out] = lists:reverse(In, []), + {{value, V}, {queue, [Y], Out}}; +out({queue, In, [V]}) when is_list(In) -> + {{value,V}, r2f(In)}; +out({queue, In,[V|Out]}) when is_list(In) -> + {{value, V}, {queue, In, Out}}; +out({pqueue, [{P, Q} | Queues]}) -> + {R, Q1} = out(Q), + NewQ = case is_empty(Q1) of + true -> case Queues of + [] -> {queue, [], []}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues} + end; + false -> {pqueue, [{P, Q1} | Queues]} + end, + {R, NewQ}. + +r2f([]) -> {queue, [], []}; +r2f([_] = R) -> {queue, [], R}; +r2f([X,Y]) -> {queue, [X], [Y]}; +r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. diff --git a/src/rabbit.erl b/src/rabbit.erl index c8c814b6..1ddb5151 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -75,19 +75,20 @@ start() -> try ok = ensure_working_log_handlers(), ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = start_applications(?APPS) + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) end. stop() -> - ok = stop_applications(?APPS). + ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> spawn(fun () -> SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n", + rabbit_log:info("Stop-and-halt request received; " + "halting in ~p milliseconds~n", [SleepTime]), timer:sleep(SleepTime), init:stop() @@ -109,34 +110,6 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- -manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> - Iterate(fun (App, Acc) -> - case Do(App) of - ok -> [App | Acc]; - {error, {SkipError, _}} -> Acc; - {error, Reason} -> - lists:foreach(Undo, Acc), - throw({error, {ErrorTag, App, Reason}}) - end - end, [], Apps), - ok. - -start_applications(Apps) -> - manage_applications(fun lists:foldl/3, - fun application:start/1, - fun application:stop/1, - already_started, - cannot_start_application, - Apps). - -stop_applications(Apps) -> - manage_applications(fun lists:foldr/3, - fun application:stop/1, - fun application:start/1, - not_started, - cannot_stop_application, - Apps). - start(normal, []) -> {ok, SupPid} = rabbit_sup:start_link(), @@ -300,9 +273,14 @@ insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), + {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = + application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), - ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost), + ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, + DefaultConfigurePerm, + DefaultWritePerm, + DefaultReadPerm), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b73090fc..54348d9a 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -34,11 +34,12 @@ -include("rabbit.hrl"). -export([check_login/2, user_pass_login/2, - check_vhost_access/2]). + check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]). --export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]). +-export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([set_permissions/5, clear_permissions/2, + list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -47,6 +48,8 @@ -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). +-spec(check_resource_access/3 :: + (username(), r(atom()), non_neg_integer()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -55,10 +58,13 @@ -spec(add_vhost/1 :: (vhost()) -> 'ok'). -spec(delete_vhost/1 :: (vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [vhost()]). --spec(list_vhost_users/1 :: (vhost()) -> [username()]). --spec(list_user_vhosts/1 :: (username()) -> [vhost()]). --spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok'). --spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok'). +-spec(set_permissions/5 :: + (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok'). +-spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). +-spec(list_vhost_permissions/1 :: + (vhost()) -> [{username(), regexp(), regexp(), regexp()}]). +-spec(list_user_permissions/1 :: + (username()) -> [{vhost(), regexp(), regexp(), regexp()}]). -endif. @@ -112,9 +118,9 @@ internal_lookup_vhost_access(Username, VHostPath) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:match_object( - #user_vhost{username = Username, - virtual_host = VHostPath}) of + case mnesia:read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} end @@ -131,13 +137,47 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +check_resource_access(Username, + R = #resource{kind = exchange, name = <<"">>}, + Permission) -> + check_resource_access(Username, + R#resource{name = <<"amq.default">>}, + Permission); +check_resource_access(_Username, + #resource{name = <<"amq.gen",_/binary>>}, + _Permission) -> + ok; +check_resource_access(Username, + R = #resource{virtual_host = VHostPath, name = Name}, + Permission) -> + Res = case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> + false; + [#user_permission{permission = P}] -> + case regexp:match( + binary_to_list(Name), + binary_to_list(element(Permission, P))) of + {match, _, _} -> true; + nomatch -> false + end + end, + if Res -> ok; + true -> rabbit_misc:protocol_error( + access_refused, "access to ~s refused for user '~s'", + [rabbit_misc:rs(R), Username]) + end. + add_user(Username, Password) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({user, Username}) of + case mnesia:wread({rabbit_user, Username}) of [] -> - ok = mnesia:write(#user{username = Username, - password = Password}); + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write); _ -> mnesia:abort({user_already_exists, Username}) end @@ -150,8 +190,17 @@ delete_user(Username) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:delete({user, Username}), - ok = mnesia:delete({user_vhost, Username}) + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok end)), rabbit_log:info("Deleted user ~p~n", [Username]), R. @@ -161,24 +210,28 @@ change_password(Username, Password) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:write(#user{username = Username, - password = Password}) + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write) end)), rabbit_log:info("Changed password for user ~p~n", [Username]), R. list_users() -> - mnesia:dirty_all_keys(user). + mnesia:dirty_all_keys(rabbit_user). lookup_user(Username) -> - rabbit_misc:dirty_read({user, Username}). + rabbit_misc:dirty_read({rabbit_user, Username}). add_vhost(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:wread({rabbit_vhost, VHostPath}) of [] -> - ok = mnesia:write(#vhost{virtual_host = VHostPath}), + ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || @@ -186,6 +239,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> @@ -218,53 +273,79 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun (Username) -> - ok = unmap_user_vhost(Username, VHostPath) + lists:foreach(fun ({Username, _, _, _}) -> + ok = clear_permissions(Username, VHostPath) end, - list_vhost_users(VHostPath)), - ok = mnesia:delete({vhost, VHostPath}), + list_vhost_permissions(VHostPath)), + ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. list_vhosts() -> - mnesia:dirty_all_keys(vhost). + mnesia:dirty_all_keys(rabbit_vhost). -list_vhost_users(VHostPath) -> - [Username || - #user_vhost{username = Username} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> mnesia:index_read(user_vhost, VHostPath, - #user_vhost.virtual_host) - end))]. - -list_user_vhosts(Username) -> - [VHostPath || - #user_vhost{virtual_host = VHostPath} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> mnesia:read({user_vhost, Username}) end))]. +validate_regexp(RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case regexp:parse(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) + end. -map_user_vhost(Username, VHostPath) -> +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, - fun () -> - ok = mnesia:write( - #user_vhost{username = Username, - virtual_host = VHostPath}) + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, + write) end)). -unmap_user_vhost(Username, VHostPath) -> +clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> - ok = mnesia:delete_object( - #user_vhost{username = Username, - virtual_host = VHostPath}) + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) end)). +list_vhost_permissions(VHostPath) -> + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_vhost( + VHostPath, match_user_vhost('_', VHostPath)))]. + +list_user_permissions(Username) -> + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_user( + Username, match_user_vhost(Username, '_')))]. + +list_permissions(QueryThunk) -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + #user_permission{user_vhost = #user_vhost{username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction(QueryThunk)]. + +match_user_vhost(Username, VHostPath) -> + fun () -> mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = '_'}, + read) + end. diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index dee71d23..21999f16 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -53,7 +53,7 @@ -spec(start/1 :: (bool() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). - + -endif. %%---------------------------------------------------------------------------- @@ -78,7 +78,8 @@ stop() -> register(Pid, HighMemMFA) -> ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}). + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- @@ -101,7 +102,7 @@ handle_call({register, Pid, HighMemMFA}, end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), {ok, ok, State#alarms{alertees = NewAlertees}}; - + handle_call(_Request, State) -> {ok, not_understood, State}. @@ -135,7 +136,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of + Mod = case os:type() of %% memsup doesn't take account of buffers or cache when %% considering "free" memory - therefore on Linux we can %% get memory alarms very easily without any pressure @@ -143,7 +144,7 @@ start_memsup() -> %% our own simple memory monitor. %% {unix, linux} -> rabbit_memsup_linux; - + %% Start memsup programmatically rather than via the %% rabbitmq-server script. This is not quite the right %% thing to do as os_mon checks to see if memsup is diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 382810c3..eb076e94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,13 +37,13 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, unblock/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -91,15 +91,17 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -130,7 +132,7 @@ recover_durable_queues() -> %% re-created it). case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:match_object( - durable_queues, RecoveredQ, read) of + rabbit_durable_queue, RecoveredQ, read) of [_] -> ok = store_queue(Q), true; [] -> false @@ -144,7 +146,7 @@ recover_durable_queues() -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), + <- mnesia:table(rabbit_durable_queue), node(Pid) == Node])) end)), ok. @@ -157,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), ok = add_default_binding(Q), Q; @@ -170,11 +172,11 @@ declare(QueueName, Durable, AutoDelete, Args) -> end. store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(durable_queues, Q, write), - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_queue, Q, write), ok. start_queue_process(Q) -> @@ -188,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ok. lookup(Name) -> - rabbit_misc:dirty_read({amqqueue, Name}). + rabbit_misc:dirty_read({rabbit_queue, Name}). with(Name, F, E) -> case lookup(Name) of @@ -205,15 +207,16 @@ with_or_die(Name, F) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server:call(QPid, info). + gen_server2:pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server:call(QPid, {info, Items}) of + case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -222,82 +225,91 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). + lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). + +unblock(QPid, ChPid) -> + gen_server2:cast(QPid, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [_] -> ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), - ok = mnesia:delete({durable_queues, QueueName}), + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). @@ -309,12 +321,12 @@ on_node_down(Node) -> fun (QueueName, Acc) -> ok = rabbit_exchange:delete_transient_queue_bindings( QueueName), - ok = mnesia:delete({amqqueue, QueueName}), + ok = mnesia:delete({rabbit_queue, QueueName}), Acc end, ok, qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(amqqueue), + <- mnesia:table(rabbit_queue), node(Pid) == Node])) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6282a8fb..c390b2b7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -62,9 +62,10 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, - is_overload_protection_active, + is_limit_active, unsent_message_count}). -define(INFO_KEYS, @@ -85,7 +86,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -131,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, - unsent_message_count = Count}) -> - {Result, NewActive} = - if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; - true -> - {ok, Active} - end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), - Result. +is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok + end. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of + true -> + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; + false -> + store_ch_record(C#cr{is_limit_active = true}), + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; {empty, _} -> - not_offered + {not_offered, State} end. attempt_delivery(none, Message, State) -> @@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) -> {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} @@ -500,8 +514,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -552,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -586,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -601,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), + if Consumers == [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -622,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers} -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), + if NewConsumers == [] -> + ok = rabbit_limiter:unregister(LimiterPid, self()); + true -> + ok + end, ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, @@ -730,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); + handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + noreply( + possibly_unblock( + State, ChPid, + fun (C = #cr{consumers = Consumers, + limiter_pid = OldLimiterPid, + is_limit_active = Limited}) -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + NewLimited = Limited andalso LimiterPid =/= undefined, + C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -758,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5fd9a512..b2716ec4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,23 +33,29 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/4, do/2, do/3, shutdown/1]). +-behaviour(gen_server2). + +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). +-define(HIBERNATE_AFTER, 1000). + +-define(MAX_PERMISSION_CACHE_SIZE, 12). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -61,112 +67,126 @@ %%---------------------------------------------------------------------------- -start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). +start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> + {ok, Pid} = gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + noreply(NewState); {noreply, NewState} -> - NewState; + noreply(NewState); stop -> - exit(normal) + {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + ok = notify_queues(internal_rollback(State)), + Reason = {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State#ch{state = terminating}}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + noreply(State); -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> + ok = clear_permission_cache(), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; + noreply(State). -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). +handle_info(timeout, State) -> + ok = clear_permission_cache(), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). -%%--------------------------------------------------------------------------- +terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + state = terminating}) -> + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid); -terminate(Reason, State = #ch{writer_pid = WriterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid, + limiter_pid = LimiterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), - exit(Reason). + rabbit_limiter:shutdown(LimiterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -190,6 +210,35 @@ return_queue_declare_ok(State, NoWait, Q) -> {reply, Reply, NewState} end. +check_resource_access(Username, Resource, Perm) -> + V = {Resource, Perm}, + Cache = case get(permission_cache) of + undefined -> []; + Other -> Other + end, + CacheTail = + case lists:member(V, Cache) of + true -> lists:delete(V, Cache); + false -> ok = rabbit_access_control:check_resource_access( + Username, Resource, Perm), + lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) + end, + put(permission_cache, [V | CacheTail]), + ok. + +clear_permission_cache() -> + erase(permission_cache), + ok. + +check_configure_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.configure). + +check_write_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.write). + +check_read_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.read). + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -248,7 +297,6 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -260,6 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. @@ -273,7 +322,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -286,9 +335,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter_pid, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -299,12 +349,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_read_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -330,12 +381,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, + limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binstring_guid("amq.ctag"); @@ -349,7 +401,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -380,8 +432,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -402,7 +453,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -414,13 +465,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS - {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "global=true", []); + +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "prefetch_size!=0 (~w)", [Size]); + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter_pid = LimiterPid }) -> + NewLimiterPid = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> + LPid = rabbit_limiter:start_link(self()), + ok = limit_queues(LPid, State), + LPid; + {_, 0} -> + ok = rabbit_limiter:shutdown(LimiterPid), + ok = limit_queues(undefined, State), + undefined; + {_, _} -> + LimiterPid + end, + ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -429,14 +501,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -454,8 +525,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -476,6 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -495,6 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configure_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -504,6 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -554,9 +627,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, State), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args)); - Other -> Other + Other = #amqqueue{name = QueueName} -> + check_configure_permitted(QueueName, State), + Other end, return_queue_declare_ok(State, NoWait, Q); @@ -565,6 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), + check_configure_permitted(QueueName, State), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); @@ -575,6 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -611,6 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -660,9 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_write_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, exchange_not_found} -> rabbit_misc:protocol_error( @@ -748,10 +829,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -766,7 +847,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter_pid, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -803,19 +886,37 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). +notify_queues(#ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). + +limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. + +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) +notify_limiter(undefined, _Acked) -> + ok; +notify_limiter(LimiterPid, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:ack(LimiterPid, Count) + end. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -823,7 +924,8 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n", + Other -> rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", [Other]), false end. @@ -833,7 +935,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -845,6 +947,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 352d7e75..6649899a 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -127,10 +127,10 @@ Available commands: delete_vhost <VHostPath> list_vhosts - map_user_vhost <UserName> <VHostPath> - unmap_user_vhost <UserName> <VHostPath> - list_user_vhosts <UserName> - list_vhost_users <VHostPath> + set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> + clear_permissions [-p <VHostPath>] <UserName> + list_permissions [-p <VHostPath>] + list_user_permissions <UserName> list_queues [-p <VHostPath>] [<QueueInfoItem> ...] list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] @@ -236,25 +236,14 @@ action(list_vhosts, Node, [], Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> - Inform("Mapping user ~p to vhost ~p", Args), - call(Node, {rabbit_access_control, map_user_vhost, Args}); - -action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> - Inform("Unmapping user ~p from vhost ~p", Args), - call(Node, {rabbit_access_control, unmap_user_vhost, Args}); - -action(list_user_vhosts, Node, Args = [_Username], Inform) -> - Inform("Listing vhosts for user ~p", Args), - display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args})); - -action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> - Inform("Listing users for vhosts ~p", Args), - display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); +action(list_user_permissions, Node, Args = [_Username], Inform) -> + Inform("Listing permissions for user ~p", Args), + display_list(call(Node, {rabbit_access_control, list_user_permissions, + Args})); action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), ArgAtoms = list_replace(node, pid, default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, @@ -263,7 +252,7 @@ action(list_queues, Node, Args, Inform) -> action(list_exchanges, Node, Args, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), ArgAtoms = default_if_empty(RemainingArgs, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), @@ -271,7 +260,7 @@ action(list_exchanges, Node, Args, Inform) -> action(list_bindings, Node, Args, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag(Args), + {VHostArg, _} = parse_vhost_flag_bin(Args), InfoKeys = [exchange_name, routing_key, queue_name, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || @@ -285,15 +274,37 @@ action(list_connections, Node, Args, Inform) -> default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), - ArgAtoms). + ArgAtoms); + +action(Command, Node, Args, Inform) -> + {VHost, RemainingArgs} = parse_vhost_flag(Args), + action(Command, Node, VHost, RemainingArgs, Inform). + +action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> + Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), + call(Node, {rabbit_access_control, set_permissions, + [Username, VHost, CPerm, WPerm, RPerm]}); + +action(clear_permissions, Node, VHost, [Username], Inform) -> + Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), + call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); + +action(list_permissions, Node, VHost, [], Inform) -> + Inform("Listing permissions in vhost ~p", [VHost]), + display_list(call(Node, {rabbit_access_control, list_vhost_permissions, + [VHost]})). parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {list_to_binary(VHost), RemainingArgs}; - RemainingArgs -> - {<<"/">>, RemainingArgs} - end. + case Args of + ["-p", VHost | RemainingArgs] -> + {VHost, RemainingArgs}; + RemainingArgs -> + {"/", RemainingArgs} + end. + +parse_vhost_flag_bin(Args) -> + {VHost, RemainingArgs} = parse_vhost_flag(Args), + {list_to_binary(VHost), RemainingArgs}. default_if_empty(List, Default) when is_list(List) -> if List == [] -> @@ -303,21 +314,17 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach( - fun (Result) -> - io:fwrite( - lists:flatten( - rabbit_misc:intersperse( - "\t", - [format_info_item(Result, X) || X <- InfoItemKeys]))), - io:nl() - end, - Results), + lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + X <- InfoItemKeys]) + end, Results), ok; - display_info_list(Other, _) -> Other. +display_row(Row) -> + io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), + io:nl(). + format_info_item(Items, Key) -> {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), case Info of @@ -334,8 +341,10 @@ format_info_item(Items, Key) -> end. display_list(L) when is_list(L) -> - lists:foreach(fun (I) -> - io:format("~s~n", [binary_to_list(I)]) + lists:foreach(fun (I) when is_binary(I) -> + io:format("~s~n", [url_encode(I)]); + (I) when is_tuple(I) -> + display_row([url_encode(V) || V <- tuple_to_list(I)]) end, lists:sort(L)), ok; diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 9a9220b5..183b6984 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -46,7 +46,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " ++ + rabbit_log:error("Failed to append contents of " "log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7f3a78e9..fc89cfca 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -79,7 +79,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -91,6 +91,7 @@ -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -106,14 +107,16 @@ recover() -> ok = rabbit_misc:table_foreach( - fun(Exchange) -> ok = mnesia:write(Exchange) end, - durable_exchanges), + fun(Exchange) -> ok = mnesia:write(rabbit_exchange, + Exchange, write) + end, rabbit_durable_exchange), ok = rabbit_misc:table_foreach( fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute) - end, durable_routes), - ok. + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write) + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -123,11 +126,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> ok = mnesia:write(Exchange), + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), if Durable -> - ok = mnesia:write( - durable_exchanges, Exchange, write); + ok = mnesia:write(rabbit_durable_exchange, + Exchange, write); true -> ok end, Exchange; @@ -141,6 +144,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -154,7 +159,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> [rabbit_misc:rs(Name), ActualType, RequiredType]). lookup(Name) -> - rabbit_misc:dirty_read({exchange, Name}). + rabbit_misc:dirty_read({rabbit_exchange, Name}). lookup_or_die(Name) -> case lookup(Name) of @@ -166,6 +171,7 @@ lookup_or_die(Name) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). map(VHostPath, F) -> @@ -207,64 +213,80 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{exchange_name = Name, _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); - -route(X = #exchange{type = fanout}, _) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - case mnesia:dirty_read({amqqueue, Key}) of + case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end @@ -275,10 +297,16 @@ lookup_qpids(Queues) -> %% to be implemented for 0.91 ? delete_exchange_bindings(ExchangeName) -> - indexed_delete( - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - fun delete_forward_routes/1, fun mnesia:delete_object/1). + [begin + ok = mnesia:delete_object(rabbit_reverse_route, + reverse_route(Route), write), + ok = delete_forward_routes(Route) + end || Route <- mnesia:match_object( + rabbit_route, + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + write)], + ok. delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -288,29 +316,27 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, FwdDeleteFun) -> Exchanges = exchanges_for_queue(QueueName), - indexed_delete( - reverse_route(#route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - fun mnesia:delete_object/1, FwdDeleteFun), [begin - [X] = mnesia:read({exchange, ExchangeName}), + ok = FwdDeleteFun(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), ok = maybe_auto_delete(X) end || ExchangeName <- Exchanges], ok. -indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> - [begin - ok = ReverseDeleteFun(reverse_route(Route)), - ok = ForwardsDeleteFun(Route) - end || Route <- mnesia:match_object(Match)], - ok. - delete_forward_routes(Route) -> - ok = mnesia:delete_object(Route), - ok = mnesia:delete_object(durable_routes, Route, write). + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). delete_transient_forward_routes(Route) -> - ok = mnesia:delete_object(Route). + ok = mnesia:delete_object(rabbit_route, Route, write). exchanges_for_queue(QueueName) -> MatchHead = reverse_route( @@ -319,7 +345,7 @@ exchanges_for_queue(QueueName) -> _ = '_'}}), sets:to_list( sets:from_list( - mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). contains(Table, MatchHead) -> try @@ -339,7 +365,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({exchange, Exchange}) of + fun() -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, not_found}; [X] -> Fun(X) end @@ -347,8 +373,8 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case {mnesia:read({exchange, Exchange}), - mnesia:read({amqqueue, Queue})} of + fun() -> case {mnesia:read({rabbit_exchange, Exchange}), + mnesia:read({rabbit_queue, Queue})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; {[_], [ ]} -> {error, queue_not_found}; @@ -382,13 +408,15 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of - true -> Fun(durable_routes, #route{binding = Binding}, write); + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); false -> ok end, - [ok, ok] = [Fun(element(1, R), R, write) || - R <- tuple_to_list(route_with_reverse(Binding))], + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. list_bindings(VHostPath) -> @@ -399,6 +427,7 @@ list_bindings(VHostPath) -> queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, @@ -434,6 +463,67 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. @@ -477,15 +567,15 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - case contains(route, Match) orelse contains(durable_routes, Match) of + case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}). + ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API @@ -501,7 +591,7 @@ list_exchange_bindings(ExchangeName) -> #route{binding = #binding{queue_name = QueueName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. % Refactoring is left as an exercise for the reader list_queue_bindings(QueueName) -> @@ -511,4 +601,4 @@ list_queue_bindings(QueueName) -> #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 060bed48..5c447792 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -95,13 +95,15 @@ collect_content(ChannelPid, MethodName) -> true -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, got one for class ~w instead", + "expected content header for class ~w, " + "got one for class ~w instead", [ClassId, HeaderClassId]) end; _ -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, got non content header frame instead", + "expected content header for class ~w, " + "got non content header frame instead", [ClassId]) end. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 51c1665b..2be00503 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -82,7 +82,8 @@ guid() -> %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. G = case get(guid) of - undefined -> {{gen_server:call(?SERVER, serial), self()}, 0}; + undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, + 0}; {S, I} -> {S, I+1} end, put(guid, G), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl new file mode 100644 index 00000000..3f9b6ebb --- /dev/null +++ b/src/rabbit_limiter.erl @@ -0,0 +1,195 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_limiter). + +-behaviour(gen_server). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). +-export([start_link/1, shutdown/1]). +-export([limit/2, can_send/2, ack/2, register/2, unregister/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(maybe_pid() :: pid() | 'undefined'). + +-spec(start_link/1 :: (pid()) -> pid()). +-spec(shutdown/1 :: (maybe_pid()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-record(lim, {prefetch_count = 0, + ch_pid, + queues = dict:new(), % QPid -> {MonitorRef, Notify} + volume = 0}). +%% 'Notify' is a boolean that indicates whether a queue should be +%% notified of a change in the limit or volume that may allow it to +%% deliver more messages via the limiter's channel. + +%%---------------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------------- + +start_link(ChPid) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + Pid. + +shutdown(undefined) -> + ok; +shutdown(LimiterPid) -> + unlink(LimiterPid), + gen_server2:cast(LimiterPid, shutdown). + +limit(undefined, 0) -> + ok; +limit(LimiterPid, PrefetchCount) -> + gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit +can_send(undefined, _QPid) -> + true; +can_send(LimiterPid, QPid) -> + rabbit_misc:with_exit_handler( + fun () -> true end, + fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). + +%% Let the limiter know that the channel has received some acks from a +%% consumer +ack(undefined, _Count) -> ok; +ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). + +register(undefined, _QPid) -> ok; +register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). + +unregister(undefined, _QPid) -> ok; +unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([ChPid]) -> + {ok, #lim{ch_pid = ChPid} }. + +handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> + case limit_reached(State) of + true -> {reply, false, limit_queue(QPid, State)}; + false -> {reply, true, State#lim{volume = Volume + 1}} + end. + +handle_cast(shutdown, State) -> + {stop, normal, State}; + +handle_cast({limit, PrefetchCount}, State) -> + {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; + +handle_cast({ack, Count}, State = #lim{volume = Volume}) -> + NewVolume = if Volume == 0 -> 0; + true -> Volume - Count + end, + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + +handle_cast({register, QPid}, State) -> + {noreply, remember_queue(QPid, State)}; + +handle_cast({unregister, QPid}, State) -> + {noreply, forget_queue(QPid, State)}. + +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) -> + {noreply, forget_queue(QPid, State)}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> + State. + +%%---------------------------------------------------------------------------- +%% Internal plumbing +%%---------------------------------------------------------------------------- + +maybe_notify(OldState, NewState) -> + case limit_reached(OldState) andalso not(limit_reached(NewState)) of + true -> notify_queues(NewState); + false -> NewState + end. + +limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> + Limit =/= 0 andalso Volume >= Limit. + +remember_queue(QPid, State = #lim{queues = Queues}) -> + case dict:is_key(QPid, Queues) of + false -> MRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + true -> State + end. + +forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> + case dict:find(QPid, Queues) of + {ok, {MRef, _}} -> + true = erlang:demonitor(MRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = dict:erase(QPid, Queues)}; + error -> State + end. + +limit_queue(QPid, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, _}) -> {MRef, true} end, + State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + +notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + {QList, NewQueues} = + dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], dict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), + case length(QList) of + 0 -> ok; + L -> + %% We randomly vary the position of queues in the list, + %% thus ensuring that each queue has an equal chance of + %% being notified first. + {L1, L2} = lists:split(random:uniform(L), QList), + [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], + ok + end, + State#lim{queues = NewQueues}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 1fcd9a61..eced0b3c 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,6 +50,7 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). +-export([start_applications/1, stop_applications/1]). -import(mnesia). -import(lists). @@ -106,6 +107,8 @@ -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). +-spec(start_applications/1 :: ([atom()]) -> 'ok'). +-spec(stop_applications/1 :: ([atom()]) -> 'ok'). -endif. @@ -232,7 +235,7 @@ filter_exit_map(F, L) -> with_user(Username, Thunk) -> fun () -> - case mnesia:read({user, Username}) of + case mnesia:read({rabbit_user, Username}) of [] -> mnesia:abort({no_such_user, Username}); [_U] -> @@ -242,7 +245,7 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:read({rabbit_vhost, VHostPath}) of [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> @@ -385,3 +388,32 @@ format_stderr(Fmt, Args) -> io:format(Fmt, Args) end, ok. + +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 0c573073..575ecb0a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -101,33 +101,50 @@ force_reset() -> reset(true). %%-------------------------------------------------------------------- table_definitions() -> - [{user, [{disc_copies, [node()]}, - {attributes, record_info(fields, user)}]}, - {user_vhost, [{type, bag}, - {disc_copies, [node()]}, - {attributes, record_info(fields, user_vhost)}, - {index, [virtual_host]}]}, - {vhost, [{disc_copies, [node()]}, - {attributes, record_info(fields, vhost)}]}, - {rabbit_config, [{disc_copies, [node()]}]}, - {listener, [{type, bag}, - {attributes, record_info(fields, listener)}]}, - {durable_routes, [{disc_copies, [node()]}, - {record_name, route}, - {attributes, record_info(fields, route)}]}, - {route, [{type, ordered_set}, - {attributes, record_info(fields, route)}]}, - {reverse_route, [{type, ordered_set}, - {attributes, record_info(fields, reverse_route)}]}, - {durable_exchanges, [{disc_copies, [node()]}, - {record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, - {exchange, [{attributes, record_info(fields, exchange)}]}, - {durable_queues, [{disc_copies, [node()]}, - {record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}, - {amqqueue, [{attributes, record_info(fields, amqqueue)}, - {index, [pid]}]}]. + [{rabbit_user, + [{record_name, user}, + {attributes, record_info(fields, user)}, + {disc_copies, [node()]}]}, + {rabbit_user_permission, + [{record_name, user_permission}, + {attributes, record_info(fields, user_permission)}, + {disc_copies, [node()]}]}, + {rabbit_vhost, + [{record_name, vhost}, + {attributes, record_info(fields, vhost)}, + {disc_copies, [node()]}]}, + {rabbit_config, + [{disc_copies, [node()]}]}, + {rabbit_listener, + [{record_name, listener}, + {attributes, record_info(fields, listener)}, + {type, bag}]}, + {rabbit_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {disc_copies, [node()]}]}, + {rabbit_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}]}, + {rabbit_reverse_route, + [{record_name, reverse_route}, + {attributes, record_info(fields, reverse_route)}, + {type, ordered_set}]}, + {rabbit_durable_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {disc_copies, [node()]}]}, + {rabbit_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {rabbit_durable_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {disc_copies, [node()]}]}, + {rabbit_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}]}]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. @@ -246,8 +263,8 @@ init_db(ClusterNodes) -> %% NB: we cannot use rabbit_log here since %% it may not have been started yet error_logger:warning_msg( - "schema integrity check failed: ~p~n" ++ - "moving database to backup location " ++ + "schema integrity check failed: ~p~n" + "moving database to backup location " "and recreating schema from scratch~n", [Reason]), ok = move_db(), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 99ea37d8..2dbd5a5a 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -123,6 +123,7 @@ stop_tcp_listener(Host, Port) -> tcp_listener_started(IPAddress, Port) -> ok = mnesia:dirty_write( + rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), @@ -130,19 +131,20 @@ tcp_listener_started(IPAddress, Port) -> tcp_listener_stopped(IPAddress, Port) -> ok = mnesia:dirty_delete_object( + rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), port = Port}). active_listeners() -> - rabbit_misc:dirty_read_all(listener). + rabbit_misc:dirty_read_all(rabbit_listener). node_listeners(Node) -> - mnesia:dirty_read(listener, Node). + mnesia:dirty_read(rabbit_listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(listener, Node). + ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 6e332484..d0d60ddf 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -49,6 +49,8 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). +-define(HIBERNATE_AFTER, 10000). + -define(MAX_WRAP_ENTRIES, 500). -define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). @@ -93,7 +95,7 @@ start_link() -> transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). extend_transaction(TxnKey, MessageList) -> ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), @@ -105,17 +107,17 @@ dirty_work(MessageList) -> commit_transaction(TxnKey) -> ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}). + gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). rollback_transaction(TxnKey) -> ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). force_snapshot() -> - gen_server:call(?SERVER, force_snapshot). + gen_server:call(?SERVER, force_snapshot, infinity). serial() -> - gen_server:call(?SERVER, serial). + gen_server:call(?SERVER, serial, infinity). %%-------------------------------------------------------------------- @@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) -> do_noreply(internal_commit(From, Key, NewState)); handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State = #pstate{log_handle = LH, - snapshot = Snapshot}) -> - ok = take_snapshot(LH, Snapshot), - do_reply(ok, State); +handle_call(force_snapshot, _From, State) -> + do_reply(ok, flush(true, State)); handle_call(serial, _From, State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> do_reply(Serial, State); @@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(timeout, State = #pstate{deadline = infinity}) -> + State1 = flush(true, State), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State1, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(timeout, State) -> - {noreply, flush(State)}; + do_noreply(flush(State)); handle_info(_Info, State) -> {noreply, State}. @@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) -> rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), ok = take_snapshot(LogHandle, OldFileName, Snapshot). -maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH, - snapshot = Snapshot}) - when EntryCount >= ?MAX_WRAP_ENTRIES -> +maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, + log_handle = LH, + snapshot = Snapshot}) + when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> ok = take_snapshot(LH, Snapshot), State#pstate{entry_count = 0}; -maybe_take_snapshot(State) -> +maybe_take_snapshot(_Force, State) -> State. later_ms(DeltaMilliSec) -> @@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) -> ExistingDeadline. compute_timeout(infinity) -> - infinity; + ?HIBERNATE_AFTER; compute_timeout(Deadline) -> DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, if @@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) -> do_reply(Reply, State = #pstate{deadline = Deadline}) -> {reply, Reply, State, compute_timeout(Deadline)}. -flush(State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if - PendingLogs /= [] -> +flush(State) -> flush(false, State). + +flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, + pending_replies = Waiting, + log_handle = LogHandle}) -> + State1 = if PendingLogs /= [] -> disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - maybe_take_snapshot( - State#pstate{ - entry_count = State#pstate.entry_count + 1}); - true -> + State#pstate{entry_count = State#pstate.entry_count + 1}; + true -> State end, + State2 = maybe_take_snapshot(ForceSnapshot, State1), if Waiting /= [] -> ok = disk_log:sync(LogHandle), lists:foreach(fun (From) -> gen_server:reply(From, ok) end, @@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs, true -> ok end, - State1#pstate{deadline = infinity, + State2#pstate{deadline = infinity, pending_logs = [], pending_replies = []}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3f8d7cac..ef8038e7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}) of + case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -173,7 +173,8 @@ setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of once -> - rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"), + rabbit_log:info("Enabling profiling for this connection, " + "and disabling for subsequent.~n"), rabbit_misc:set_config(profiling_enabled, false), fprof:trace(start); true -> @@ -230,8 +231,12 @@ start_connection(Parent, Deb, ClientSock) -> connection_state = pre_init}, handshake, 8)) catch - Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> (if Ex == connection_closed_abruptly -> + fun rabbit_log:warning/2; + true -> + fun rabbit_log:error/2 + end)("exception on TCP connection ~p from ~s:~p~n~p~n", + [self(), PeerAddressS, PeerPort, Ex]) after rabbit_log:info("closing TCP connection ~p from ~s:~p~n", [self(), PeerAddressS, PeerPort]), @@ -283,6 +288,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); + {channel_exit, Channel, Reason} -> + mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -350,6 +357,14 @@ terminate_channel(Channel, Ref, State) -> end, State. +handle_channel_exit(Channel, Reason, State) -> + %% We remove the channel from the inbound map only. That allows + %% the channel to be re-opened, but also means the remaining + %% cleanup, including possibly closing the connection, is deferred + %% until we get the (normal) exit signal. + erase({channel, Channel}), + handle_exception(State, Channel, Reason). + handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), maybe_close(State); @@ -404,7 +419,8 @@ wait_for_channel_termination(N, TimerRef) -> normal -> ok; _ -> rabbit_log:error( - "connection ~p, channel ~p - error while terminating:~n~p~n", + "connection ~p, channel ~p - " + "error while terminating:~n~p~n", [self(), Channel, Reason]) end, wait_for_channel_termination(N-1, TimerRef) @@ -709,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/4, - [self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ad653a2f..0b06a063 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -32,7 +32,7 @@ -module(rabbit_router). -include("rabbit.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -export([start_link/0, deliver/5]). @@ -58,7 +58,7 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -ifdef(BUG19758). @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server:cast( + gen_server2:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids @@ -110,9 +110,10 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server:call( + try gen_server2:call( {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}) + {deliver, QPids, Mandatory, Immediate, Txn, Message}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here @@ -143,7 +144,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, spawn( fun () -> R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), - gen_server:reply(From, R) + gen_server2:reply(From, R) end), {noreply, State}. diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 9e4c9c8a..2a365ce1 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -47,7 +47,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " ++ + rabbit_log:error("Failed to append contents of " "sasl log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6706ecd1..8f0a3a89 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> + passed = test_priority_queue(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -55,6 +56,62 @@ all_tests() -> passed = test_server_status(), passed. +test_priority_queue() -> + + false = priority_queue:is_queue(not_a_queue), + + %% empty Q + Q = priority_queue:new(), + {true, true, 0, [], []} = test_priority_queue(Q), + + %% 1-4 element no-priority Q + true = lists:all(fun (X) -> X =:= passed end, + lists:map(fun test_simple_n_element_queue/1, + lists:seq(1, 4))), + + %% 1-element priority Q + Q1 = priority_queue:in(foo, 1, priority_queue:new()), + {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + + %% 2-element same-priority Q + Q2 = priority_queue:in(bar, 1, Q1), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q2), + + %% 2-element different-priority Q + Q3 = priority_queue:in(bar, 2, Q1), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q3), + + %% 1-element negative priority Q + Q4 = priority_queue:in(foo, -1, priority_queue:new()), + {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + + passed. + +priority_queue_in_all(Q, L) -> + lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L). + +priority_queue_out_all(Q) -> + case priority_queue:out(Q) of + {empty, _} -> []; + {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] + end. + +test_priority_queue(Q) -> + {priority_queue:is_queue(Q), + priority_queue:is_empty(Q), + priority_queue:len(Q), + priority_queue:to_list(Q), + priority_queue_out_all(Q)}. + +test_simple_n_element_queue(N) -> + Items = lists:seq(1, N), + Q = priority_queue_in_all(priority_queue:new(), Items), + ToListRes = [{0, X} || X <- Items], + {true, false, N, ToListRes, Items} = test_priority_queue(Q), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -450,17 +507,16 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(delete_vhost, ["/testhost"]), {error, {no_such_user, _}} = - control_action(map_user_vhost, ["foo", "/"]), + control_action(set_permissions, ["foo", ".*", ".*", ".*"]), {error, {no_such_user, _}} = - control_action(unmap_user_vhost, ["foo", "/"]), + control_action(clear_permissions, ["foo"]), {error, {no_such_user, _}} = - control_action(list_user_vhosts, ["foo"]), - {error, {no_such_vhost, _}} = - control_action(map_user_vhost, ["guest", "/testhost"]), + control_action(list_user_permissions, ["foo"]), {error, {no_such_vhost, _}} = - control_action(unmap_user_vhost, ["guest", "/testhost"]), - {error, {no_such_vhost, _}} = - control_action(list_vhost_users, ["/testhost"]), + control_action(list_permissions, ["-p", "/testhost"]), + {error, {invalid_regexp, _, _}} = + control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), + %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = @@ -475,13 +531,16 @@ test_user_management() -> ok = control_action(list_vhosts, []), %% user/vhost mapping - ok = control_action(map_user_vhost, ["foo", "/testhost"]), - ok = control_action(map_user_vhost, ["foo", "/testhost"]), - ok = control_action(list_user_vhosts, ["foo"]), + ok = control_action(set_permissions, ["-p", "/testhost", + "foo", ".*", ".*", ".*"]), + ok = control_action(set_permissions, ["-p", "/testhost", + "foo", ".*", ".*", ".*"]), + ok = control_action(list_permissions, ["-p", "/testhost"]), + ok = control_action(list_user_permissions, ["foo"]), %% user/vhost unmapping - ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), - ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), + ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), + ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), %% vhost deletion ok = control_action(delete_vhost, ["/testhost"]), @@ -490,7 +549,8 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(set_permissions, ["-p", "/testhost", + "foo", ".*", ".*", ".*"]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion |