summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-05-18 17:34:25 +0100
committerTony Garnock-Jones <tonyg@lshift.net>2009-05-18 17:34:25 +0100
commit0f45dfdeb7dc41be14e57e6a4dd6490728349ea2 (patch)
treeeddc87d08b0c86b4658dfb31ab466be809b8b9e8 /src
parent17c60aec273b88d3f36d3f63a659025ea687a961 (diff)
parent3f3b658532cc348bd932ae358bfb2e8b66bdec2c (diff)
downloadrabbitmq-server-0f45dfdeb7dc41be14e57e6a4dd6490728349ea2.tar.gz
merge v1_5 into default
Diffstat (limited to 'src')
-rw-r--r--src/buffering_proxy.erl108
-rw-r--r--src/gen_server2.erl898
-rw-r--r--src/priority_queue.erl153
-rw-r--r--src/rabbit.erl42
-rw-r--r--src/rabbit_access_control.erl189
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_amqqueue.erl96
-rw-r--r--src/rabbit_amqqueue_process.erl178
-rw-r--r--src/rabbit_channel.erl310
-rw-r--r--src/rabbit_control.erl93
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_exchange.erl234
-rw-r--r--src/rabbit_framing_channel.erl6
-rw-r--r--src/rabbit_guid.erl3
-rw-r--r--src/rabbit_limiter.erl195
-rw-r--r--src/rabbit_misc.erl36
-rw-r--r--src/rabbit_mnesia.erl75
-rw-r--r--src/rabbit_networking.erl8
-rw-r--r--src/rabbit_persister.erl54
-rw-r--r--src/rabbit_reader.erl32
-rw-r--r--src/rabbit_router.erl13
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_tests.erl88
23 files changed, 2208 insertions, 618 deletions
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
deleted file mode 100644
index 344b719a..00000000
--- a/src/buffering_proxy.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(buffering_proxy).
-
--export([start_link/2]).
-
-%% internal
-
--export([mainloop/4, drain/2]).
--export([proxy_loop/3]).
-
--define(HIBERNATE_AFTER, 5000).
-
-%%----------------------------------------------------------------------------
-
-start_link(M, A) ->
- spawn_link(
- fun () -> process_flag(trap_exit, true),
- ProxyPid = self(),
- Ref = make_ref(),
- Pid = spawn_link(
- fun () -> ProxyPid ! Ref,
- mainloop(ProxyPid, Ref, M,
- M:init(ProxyPid, A)) end),
- proxy_loop(Ref, Pid, empty)
- end).
-
-%%----------------------------------------------------------------------------
-
-mainloop(ProxyPid, Ref, M, State) ->
- NewState =
- receive
- {Ref, Messages} ->
- NewSt =
- lists:foldl(fun (Msg, S) ->
- drain(M, M:handle_message(Msg, S))
- end, State, lists:reverse(Messages)),
- ProxyPid ! Ref,
- NewSt;
- Msg -> M:handle_message(Msg, State)
- after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop,
- [ProxyPid, Ref, M, State])
- end,
- ?MODULE:mainloop(ProxyPid, Ref, M, NewState).
-
-drain(M, State) ->
- receive
- Msg -> ?MODULE:drain(M, M:handle_message(Msg, State))
- after 0 ->
- State
- end.
-
-proxy_loop(Ref, Pid, State) ->
- receive
- Ref ->
- ?MODULE:proxy_loop(
- Ref, Pid,
- case State of
- empty -> waiting;
- waiting -> exit(duplicate_next);
- Messages -> Pid ! {Ref, Messages}, empty
- end);
- {'EXIT', Pid, Reason} ->
- exit(Reason);
- {'EXIT', _, Reason} ->
- exit(Pid, Reason),
- ?MODULE:proxy_loop(Ref, Pid, State);
- Msg ->
- ?MODULE:proxy_loop(
- Ref, Pid,
- case State of
- empty -> [Msg];
- waiting -> Pid ! {Ref, [Msg]}, empty;
- Messages -> [Msg | Messages]
- end)
- after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State])
- end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
new file mode 100644
index 00000000..ba8becfc
--- /dev/null
+++ b/src/gen_server2.erl
@@ -0,0 +1,898 @@
+%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is gen_server2
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
+%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% The original code could reorder messages when communicating with a
+%% process on a remote node that was not currently connected.
+%%
+%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% allow callers to attach priorities to requests. Requests with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
+%%
+%% All modifications are (C) 2009 LShift Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% $Id$
+%%
+-module(gen_server2).
+
+%%% ---------------------------------------------------
+%%%
+%%% The idea behind THIS server is that the user module
+%%% provides (different) functions to handle different
+%%% kind of inputs.
+%%% If the Parent process terminates the Module:terminate/2
+%%% function is called.
+%%%
+%%% The user module should export:
+%%%
+%%% init(Args)
+%%% ==> {ok, State}
+%%% {ok, State, Timeout}
+%%% ignore
+%%% {stop, Reason}
+%%%
+%%% handle_call(Msg, {From, Tag}, State)
+%%%
+%%% ==> {reply, Reply, State}
+%%% {reply, Reply, State, Timeout}
+%%% {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, Reply, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_cast(Msg, State)
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% terminate(Reason, State) Let the user module clean up
+%%% always called when server terminates
+%%%
+%%% ==> ok
+%%%
+%%%
+%%% The work flow (of the server) can be described as follows:
+%%%
+%%% User module Generic
+%%% ----------- -------
+%%% start -----> start
+%%% init <----- .
+%%%
+%%% loop
+%%% handle_call <----- .
+%%% -----> reply
+%%%
+%%% handle_cast <----- .
+%%%
+%%% handle_info <----- .
+%%%
+%%% terminate <----- .
+%%%
+%%% -----> reply
+%%%
+%%%
+%%% ---------------------------------------------------
+
+%% API
+-export([start/3, start/4,
+ start_link/3, start_link/4,
+ call/2, call/3, pcall/3, pcall/4,
+ cast/2, pcast/3, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5]).
+
+-export([behaviour_info/1]).
+
+%% System exports
+-export([system_continue/3,
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
+
+%% Internal exports
+-export([init_it/6, print_event/3]).
+
+-import(error_logger, [format/2]).
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+behaviour_info(callbacks) ->
+ [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
+ {terminate,2},{code_change,3}];
+behaviour_info(_Other) ->
+ undefined.
+
+%%% -----------------------------------------------------------------
+%%% Starts a generic server.
+%%% start(Mod, Args, Options)
+%%% start(Name, Mod, Args, Options)
+%%% start_link(Mod, Args, Options)
+%%% start_link(Name, Mod, Args, Options) where:
+%%% Name ::= {local, atom()} | {global, atom()}
+%%% Mod ::= atom(), callback module implementing the 'real' server
+%%% Args ::= term(), init arguments (to Mod:init/1)
+%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%% Flag ::= trace | log | {logfile, File} | statistics | debug
+%%% (debug == log && statistics)
+%%% Returns: {ok, Pid} |
+%%% {error, {already_started, Pid}} |
+%%% {error, Reason}
+%%% -----------------------------------------------------------------
+start(Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Mod, Args, Options).
+
+start(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Name, Mod, Args, Options).
+
+start_link(Mod, Args, Options) ->
+ gen:start(?MODULE, link, Mod, Args, Options).
+
+start_link(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, link, Name, Mod, Args, Options).
+
+
+%% -----------------------------------------------------------------
+%% Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%% -----------------------------------------------------------------
+call(Name, Request) ->
+ case catch gen:call(Name, '$gen_call', Request) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
+ end.
+
+call(Name, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_call', Request, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+ end.
+
+pcall(Name, Priority, Request) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
+ end.
+
+pcall(Name, Priority, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ end.
+
+%% -----------------------------------------------------------------
+%% Make a cast to a generic server.
+%% -----------------------------------------------------------------
+cast({global,Name}, Request) ->
+ catch global:send(Name, cast_msg(Request)),
+ ok;
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_atom(Dest) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_pid(Dest) ->
+ do_cast(Dest, Request).
+
+do_cast(Dest, Request) ->
+ do_send(Dest, cast_msg(Request)),
+ ok.
+
+cast_msg(Request) -> {'$gen_cast',Request}.
+
+pcast({global,Name}, Priority, Request) ->
+ catch global:send(Name, cast_msg(Priority, Request)),
+ ok;
+pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_atom(Dest) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_pid(Dest) ->
+ do_cast(Dest, Priority, Request).
+
+do_cast(Dest, Priority, Request) ->
+ do_send(Dest, cast_msg(Priority, Request)),
+ ok.
+
+cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+
+%% -----------------------------------------------------------------
+%% Send a reply to the client.
+%% -----------------------------------------------------------------
+reply({To, Tag}, Reply) ->
+ catch To ! {Tag, Reply}.
+
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n prey
+%%-----------------------------------------------------------------
+abcast(Name, Request) when is_atom(Name) ->
+ do_abcast([node() | nodes()], Name, cast_msg(Request)).
+
+abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
+ do_abcast(Nodes, Name, cast_msg(Request)).
+
+do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
+ do_send({Name,Node},Msg),
+ do_abcast(Nodes, Name, Msg);
+do_abcast([], _,_) -> abcast.
+
+%%% -----------------------------------------------------------------
+%%% Make a call to servers at several nodes.
+%%% Returns: {[Replies],[BadNodes]}
+%%% A Timeout can be given
+%%%
+%%% A middleman process is used in case late answers arrives after
+%%% the timeout. If they would be allowed to glog the callers message
+%%% queue, it would probably become confused. Late answers will
+%%% now arrive to the terminated middleman and so be discarded.
+%%% -----------------------------------------------------------------
+multi_call(Name, Req)
+ when is_atom(Name) ->
+ do_multi_call([node() | nodes()], Name, Req, infinity).
+
+multi_call(Nodes, Name, Req)
+ when is_list(Nodes), is_atom(Name) ->
+ do_multi_call(Nodes, Name, Req, infinity).
+
+multi_call(Nodes, Name, Req, infinity) ->
+ do_multi_call(Nodes, Name, Req, infinity);
+multi_call(Nodes, Name, Req, Timeout)
+ when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
+ do_multi_call(Nodes, Name, Req, Timeout).
+
+
+%%-----------------------------------------------------------------
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
+%% loop and become a gen_server process.
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
+%% process, including registering a name for it.
+%%-----------------------------------------------------------------
+enter_loop(Mod, Options, State) ->
+ enter_loop(Mod, Options, State, self(), infinity).
+
+enter_loop(Mod, Options, State, ServerName = {_, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity);
+
+enter_loop(Mod, Options, State, Timeout) ->
+ enter_loop(Mod, Options, State, self(), Timeout).
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ Name = get_proc_name(ServerName),
+ Parent = get_parent(),
+ Debug = debug_options(Name, Options),
+ Queue = priority_queue:new(),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+
+%%%========================================================================
+%%% Gen-callback functions
+%%%========================================================================
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+init_it(Starter, self, Name, Mod, Args, Options) ->
+ init_it(Starter, self(), Name, Mod, Args, Options);
+init_it(Starter, Parent, Name, Mod, Args, Options) ->
+ Debug = debug_options(Name, Options),
+ Queue = priority_queue:new(),
+ case catch Mod:init(Args) of
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ {stop, Reason} ->
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
+ end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+%%% ---------------------------------------------------
+%%% The MAIN loop.
+%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+ receive
+ Input -> loop(Parent, Name, State, Mod,
+ Time, in(Input, Queue), Debug)
+ after 0 ->
+ case priority_queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ receive
+ Input ->
+ loop(Parent, Name, State, Mod,
+ Time, in(Input, Queue1), Debug)
+ after Time ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, timeout)
+ end
+ end
+ end.
+
+in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+in(Input, Queue) ->
+ priority_queue:in(Input, Queue).
+
+process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+ case Msg of
+ {system, From, Req} ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, Queue]);
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Name, Msg, Mod, State, Debug);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+ end.
+
+%%% ---------------------------------------------------
+%%% Send/recive functions
+%%% ---------------------------------------------------
+do_send(Dest, Msg) ->
+ catch erlang:send(Dest, Msg).
+
+do_multi_call(Nodes, Name, Req, infinity) ->
+ Tag = make_ref(),
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ rec_nodes(Tag, Monitors, Name, undefined);
+do_multi_call(Nodes, Name, Req, Timeout) ->
+ Tag = make_ref(),
+ Caller = self(),
+ Receiver =
+ spawn(
+ fun() ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
+ Mref = erlang:monitor(process, Receiver),
+ Receiver ! {self(),Tag},
+ receive
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
+ end.
+
+send_nodes(Nodes, Name, Tag, Req) ->
+ send_nodes(Nodes, Name, Tag, Req, []).
+
+send_nodes([Node|Tail], Name, Tag, Req, Monitors)
+ when is_atom(Node) ->
+ Monitor = start_monitor(Node, Name),
+ %% Handle non-existing names in rec_nodes.
+ catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
+ send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
+ %% Skip non-atom Node
+ send_nodes(Tail, Name, Tag, Req, Monitors);
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
+ Monitors.
+
+%% Against old nodes:
+%% If no reply has been delivered within 2 secs. (per node) check that
+%% the server really exists and wait for ever for the answer.
+%%
+%% Against contemporary nodes:
+%% Wait for reply, server 'DOWN', or timeout from TimerId.
+
+rec_nodes(Tag, Nodes, Name, TimerId) ->
+ rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
+
+rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ after Time ->
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
+ end;
+rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
+ case catch erlang:cancel_timer(TimerId) of
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
+ end,
+ {Replies, Badnodes}.
+
+%% Collect all replies that already have arrived
+rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
+ {Replies, Badnodes}.
+
+
+%%% ---------------------------------------------------
+%%% Monitor functions
+%%% ---------------------------------------------------
+
+start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
+ if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
+ true ->
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
+ end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _).
+unmonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ true
+ after 0 ->
+ true
+ end.
+
+%%% ---------------------------------------------------
+%%% Message handling functions
+%%% ---------------------------------------------------
+
+dispatch({'$gen_cast', Msg}, Mod, State) ->
+ Mod:handle_cast(Msg, State);
+dispatch(Info, Mod, State) ->
+ Mod:handle_info(Info, State).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {reply, Reply, NState, Time1} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, [])),
+ reply(From, Reply),
+ exit(R);
+ Other -> handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {reply, Reply, NState, Time1} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue, Debug)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply,
+ Parent, Name, Msg, Mod, State, Queue, Debug).
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+ case Reply of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, []);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+ end.
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+ case Reply of
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, Debug);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, Debug);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ end.
+
+reply(Name, {To, Tag}, Reply, State, Debug) ->
+ reply({To, Tag}, Reply),
+ sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {out, Reply, To, State} ).
+
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, Queue, Debug).
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+ terminate(Reason, Name, [], Mod, State, Debug).
+
+system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+ case catch Mod:code_change(OldVsn, State, Extra) of
+ {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
+ Else -> Else
+ end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages. Print them as the call-back module sees
+%% them, not as the real erlang messages. Use trace for that.
+%%-----------------------------------------------------------------
+print_event(Dev, {in, Msg}, Name) ->
+ case Msg of
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+ io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+ io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
+
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Name, Msg, Mod, State, Debug) ->
+ case catch Mod:terminate(Reason, State) of
+ {'EXIT', R} ->
+ error_info(R, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ _ ->
+ error_info(Reason, Name, Msg, State, Debug),
+ exit(Reason)
+ end
+ end.
+
+error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+ %% OTP-5811 Don't send an error report if it's the system process
+ %% application_controller which is terminating - let init take care
+ %% of it instead
+ ok;
+error_info(Reason, Name, Msg, State, Debug) ->
+ Reason1 =
+ case Reason of
+ {undef,[{M,F,A}|MFAs]} ->
+ case code:is_loaded(M) of
+ false ->
+ {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ ->
+ case erlang:function_exported(M, F, length(A)) of
+ true ->
+ Reason;
+ false ->
+ {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+ _ ->
+ Reason
+ end,
+ format("** Generic server ~p terminating \n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ [Name, Msg, State, Reason1]),
+ sys:print_log(Debug),
+ ok.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+ {ok, Value};
+opt(Op, [_|Options]) ->
+ opt(Op, Options);
+opt(_, []) ->
+ false.
+
+debug_options(Name, Opts) ->
+ case opt(debug, Opts) of
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
+ end.
+
+dbg_options(Name, []) ->
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
+ dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+ dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+ case catch sys:debug_options(Opts) of
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
+ end.
+
+get_proc_name(Pid) when is_pid(Pid) ->
+ Pid;
+get_proc_name({local, Name}) ->
+ case process_info(self(), registered_name) of
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
+get_proc_name({global, Name}) ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
+ end.
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid(Parent)->
+ Parent;
+ [Parent | _] when is_atom(Parent)->
+ name_to_pid(Parent);
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+%%-----------------------------------------------------------------
+%% Status information
+%%-----------------------------------------------------------------
+format_status(Opt, StatusData) ->
+ [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
+ StatusData,
+ NameTag = if is_pid(Name) ->
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
+ Header = lists:concat(["Status for generic server ", NameTag]),
+ Log = sys:get_debug(log, Debug, []),
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true ->
+ case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ ->
+ [{data, [{"State", State}]}]
+ end,
+ [{header, Header},
+ {data, [{"Status", SysState},
+ {"Parent", Parent},
+ {"Logged events", Log},
+ {"Queued messages", priority_queue:to_list(Queue)}]} |
+ Specfic].
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
new file mode 100644
index 00000000..732757c4
--- /dev/null
+++ b/src/priority_queue.erl
@@ -0,0 +1,153 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% Priority queues have essentially the same interface as ordinary
+%% queues, except that a) there is an in/3 that takes a priority, and
+%% b) we have only implemented the core API we need.
+%%
+%% Priorities should be integers - the higher the value the higher the
+%% priority - but we don't actually check that.
+%%
+%% in/2 inserts items with priority 0.
+%%
+%% We optimise the case where a priority queue is being used just like
+%% an ordinary queue. When that is the case we represent the priority
+%% queue as an ordinary queue. We could just call into the 'queue'
+%% module for that, but for efficiency we implement the relevant
+%% functions directly in here, thus saving on inter-module calls and
+%% eliminating a level of boxing.
+%%
+%% When the queue contains items with non-zero priorities, it is
+%% represented as a sorted kv list with the inverted Priority as the
+%% key and an ordinary queue as the value. Here again we use our own
+%% ordinary queue implemention for efficiency, often making recursive
+%% calls into the same function knowing that ordinary queues represent
+%% a base case.
+
+
+-module(priority_queue).
+
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(priority() :: integer()).
+-type(squeue() :: {queue, [any()], [any()]}).
+-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
+
+-spec(new/0 :: () -> pqueue()).
+-spec(is_queue/1 :: (any()) -> bool()).
+-spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(len/1 :: (pqueue()) -> non_neg_integer()).
+-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(in/2 :: (any(), pqueue()) -> pqueue()).
+-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
+-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() ->
+ {queue, [], []}.
+
+is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+ true;
+is_queue({pqueue, Queues}) when is_list(Queues) ->
+ lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
+ Queues);
+is_queue(_) ->
+ false.
+
+is_empty({queue, [], []}) ->
+ true;
+is_empty(_) ->
+ false.
+
+len({queue, R, F}) when is_list(R), is_list(F) ->
+ length(R) + length(F);
+len({pqueue, Queues}) ->
+ lists:sum([len(Q) || {_, Q} <- Queues]).
+
+to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+ [{0, V} || V <- Out ++ lists:reverse(In, [])];
+to_list({pqueue, Queues}) ->
+ [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+
+in(Item, Q) ->
+ in(Item, 0, Q).
+
+in(X, 0, {queue, [_] = In, []}) ->
+ {queue, [X], In};
+in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out};
+in(X, Priority, _Q = {queue, [], []}) ->
+ in(X, Priority, {pqueue, []});
+in(X, Priority, Q = {queue, _, _}) ->
+ in(X, Priority, {pqueue, [{0, Q}]});
+in(X, Priority, {pqueue, Queues}) ->
+ P = -Priority,
+ {pqueue, case lists:keysearch(P, 1, Queues) of
+ {value, {_, Q}} ->
+ lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end}.
+
+out({queue, [], []} = Q) ->
+ {empty, Q};
+out({queue, [V], []}) ->
+ {{value, V}, {queue, [], []}};
+out({queue, [Y|In], []}) ->
+ [V|Out] = lists:reverse(In, []),
+ {{value, V}, {queue, [Y], Out}};
+out({queue, In, [V]}) when is_list(In) ->
+ {{value,V}, r2f(In)};
+out({queue, In,[V|Out]}) when is_list(In) ->
+ {{value, V}, {queue, In, Out}};
+out({pqueue, [{P, Q} | Queues]}) ->
+ {R, Q1} = out(Q),
+ NewQ = case is_empty(Q1) of
+ true -> case Queues of
+ [] -> {queue, [], []};
+ [{0, OnlyQ}] -> OnlyQ;
+ [_|_] -> {pqueue, Queues}
+ end;
+ false -> {pqueue, [{P, Q1} | Queues]}
+ end,
+ {R, NewQ}.
+
+r2f([]) -> {queue, [], []};
+r2f([_] = R) -> {queue, [], R};
+r2f([X,Y]) -> {queue, [X], [Y]};
+r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c8c814b6..1ddb5151 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -75,19 +75,20 @@ start() ->
try
ok = ensure_working_log_handlers(),
ok = rabbit_mnesia:ensure_mnesia_dir(),
- ok = start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- ok = stop_applications(?APPS).
+ ok = rabbit_misc:stop_applications(?APPS).
stop_and_halt() ->
spawn(fun () ->
SleepTime = 1000,
- rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n",
+ rabbit_log:info("Stop-and-halt request received; "
+ "halting in ~p milliseconds~n",
[SleepTime]),
timer:sleep(SleepTime),
init:stop()
@@ -109,34 +110,6 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
- Iterate(fun (App, Acc) ->
- case Do(App) of
- ok -> [App | Acc];
- {error, {SkipError, _}} -> Acc;
- {error, Reason} ->
- lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
- end
- end, [], Apps),
- ok.
-
-start_applications(Apps) ->
- manage_applications(fun lists:foldl/3,
- fun application:start/1,
- fun application:stop/1,
- already_started,
- cannot_start_application,
- Apps).
-
-stop_applications(Apps) ->
- manage_applications(fun lists:foldr/3,
- fun application:stop/1,
- fun application:start/1,
- not_started,
- cannot_stop_application,
- Apps).
-
start(normal, []) ->
{ok, SupPid} = rabbit_sup:start_link(),
@@ -300,9 +273,14 @@ insert_default_data() ->
{ok, DefaultUser} = application:get_env(default_user),
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
+ {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
+ application:get_env(default_permissions),
ok = rabbit_access_control:add_vhost(DefaultVHost),
ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
- ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost),
+ ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
+ DefaultConfigurePerm,
+ DefaultWritePerm,
+ DefaultReadPerm),
ok.
start_builtin_amq_applications() ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b73090fc..54348d9a 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -34,11 +34,12 @@
-include("rabbit.hrl").
-export([check_login/2, user_pass_login/2,
- check_vhost_access/2]).
+ check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
--export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]).
--export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]).
+-export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
+-export([set_permissions/5, clear_permissions/2,
+ list_vhost_permissions/1, list_user_permissions/1]).
%%----------------------------------------------------------------------------
@@ -47,6 +48,8 @@
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
+-spec(check_resource_access/3 ::
+ (username(), r(atom()), non_neg_integer()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -55,10 +58,13 @@
-spec(add_vhost/1 :: (vhost()) -> 'ok').
-spec(delete_vhost/1 :: (vhost()) -> 'ok').
-spec(list_vhosts/0 :: () -> [vhost()]).
--spec(list_vhost_users/1 :: (vhost()) -> [username()]).
--spec(list_user_vhosts/1 :: (username()) -> [vhost()]).
--spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok').
--spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok').
+-spec(set_permissions/5 ::
+ (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (username(), vhost()) -> 'ok').
+-spec(list_vhost_permissions/1 ::
+ (vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
+-spec(list_user_permissions/1 ::
+ (username()) -> [{vhost(), regexp(), regexp(), regexp()}]).
-endif.
@@ -112,9 +118,9 @@ internal_lookup_vhost_access(Username, VHostPath) ->
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:match_object(
- #user_vhost{username = Username,
- virtual_host = VHostPath}) of
+ case mnesia:read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
[] -> not_found;
[R] -> {ok, R}
end
@@ -131,13 +137,47 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+check_resource_access(Username,
+ R = #resource{kind = exchange, name = <<"">>},
+ Permission) ->
+ check_resource_access(Username,
+ R#resource{name = <<"amq.default">>},
+ Permission);
+check_resource_access(_Username,
+ #resource{name = <<"amq.gen",_/binary>>},
+ _Permission) ->
+ ok;
+check_resource_access(Username,
+ R = #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ Res = case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ case regexp:match(
+ binary_to_list(Name),
+ binary_to_list(element(Permission, P))) of
+ {match, _, _} -> true;
+ nomatch -> false
+ end
+ end,
+ if Res -> ok;
+ true -> rabbit_misc:protocol_error(
+ access_refused, "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(R), Username])
+ end.
+
add_user(Username, Password) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read({user, Username}) of
+ case mnesia:wread({rabbit_user, Username}) of
[] ->
- ok = mnesia:write(#user{username = Username,
- password = Password});
+ ok = mnesia:write(rabbit_user,
+ #user{username = Username,
+ password = Password},
+ write);
_ ->
mnesia:abort({user_already_exists, Username})
end
@@ -150,8 +190,17 @@ delete_user(Username) ->
rabbit_misc:with_user(
Username,
fun () ->
- ok = mnesia:delete({user, Username}),
- ok = mnesia:delete({user_vhost, Username})
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
end)),
rabbit_log:info("Deleted user ~p~n", [Username]),
R.
@@ -161,24 +210,28 @@ change_password(Username, Password) ->
rabbit_misc:with_user(
Username,
fun () ->
- ok = mnesia:write(#user{username = Username,
- password = Password})
+ ok = mnesia:write(rabbit_user,
+ #user{username = Username,
+ password = Password},
+ write)
end)),
rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
list_users() ->
- mnesia:dirty_all_keys(user).
+ mnesia:dirty_all_keys(rabbit_user).
lookup_user(Username) ->
- rabbit_misc:dirty_read({user, Username}).
+ rabbit_misc:dirty_read({rabbit_user, Username}).
add_vhost(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read({vhost, VHostPath}) of
+ case mnesia:wread({rabbit_vhost, VHostPath}) of
[] ->
- ok = mnesia:write(#vhost{virtual_host = VHostPath}),
+ ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
@@ -186,6 +239,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
@@ -218,53 +273,79 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun (Username) ->
- ok = unmap_user_vhost(Username, VHostPath)
+ lists:foreach(fun ({Username, _, _, _}) ->
+ ok = clear_permissions(Username, VHostPath)
end,
- list_vhost_users(VHostPath)),
- ok = mnesia:delete({vhost, VHostPath}),
+ list_vhost_permissions(VHostPath)),
+ ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
list_vhosts() ->
- mnesia:dirty_all_keys(vhost).
+ mnesia:dirty_all_keys(rabbit_vhost).
-list_vhost_users(VHostPath) ->
- [Username ||
- #user_vhost{username = Username} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () -> mnesia:index_read(user_vhost, VHostPath,
- #user_vhost.virtual_host)
- end))].
-
-list_user_vhosts(Username) ->
- [VHostPath ||
- #user_vhost{virtual_host = VHostPath} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () -> mnesia:read({user_vhost, Username}) end))].
+validate_regexp(RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case regexp:parse(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+ end.
-map_user_vhost(Username, VHostPath) ->
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
- fun () ->
- ok = mnesia:write(
- #user_vhost{username = Username,
- virtual_host = VHostPath})
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
end)).
-unmap_user_vhost(Username, VHostPath) ->
+clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
- ok = mnesia:delete_object(
- #user_vhost{username = Username,
- virtual_host = VHostPath})
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
end)).
+list_vhost_permissions(VHostPath) ->
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_vhost(
+ VHostPath, match_user_vhost('_', VHostPath)))].
+
+list_user_permissions(Username) ->
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user(
+ Username, match_user_vhost(Username, '_')))].
+
+list_permissions(QueryThunk) ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ #user_permission{user_vhost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index dee71d23..21999f16 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -53,7 +53,7 @@
-spec(start/1 :: (bool() | 'auto') -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
-
+
-endif.
%%----------------------------------------------------------------------------
@@ -78,7 +78,8 @@ stop() ->
register(Pid, HighMemMFA) ->
ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA}).
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -101,7 +102,7 @@ handle_call({register, Pid, HighMemMFA},
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
{ok, ok, State#alarms{alertees = NewAlertees}};
-
+
handle_call(_Request, State) ->
{ok, not_understood, State}.
@@ -135,7 +136,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
+ Mod = case os:type() of
%% memsup doesn't take account of buffers or cache when
%% considering "free" memory - therefore on Linux we can
%% get memory alarms very easily without any pressure
@@ -143,7 +144,7 @@ start_memsup() ->
%% our own simple memory monitor.
%%
{unix, linux} -> rabbit_memsup_linux;
-
+
%% Start memsup programmatically rather than via the
%% rabbitmq-server script. This is not quite the right
%% thing to do as os_mon checks to see if memsup is
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 382810c3..eb076e94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,13 +37,13 @@
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, unblock/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
--import(gen_server).
+-import(gen_server2).
-import(lists).
-import(queue).
@@ -91,15 +91,17 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -130,7 +132,7 @@ recover_durable_queues() ->
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:match_object(
- durable_queues, RecoveredQ, read) of
+ rabbit_durable_queue, RecoveredQ, read) of
[_] -> ok = store_queue(Q),
true;
[] -> false
@@ -144,7 +146,7 @@ recover_durable_queues() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
+ <- mnesia:table(rabbit_durable_queue),
node(Pid) == Node]))
end)),
ok.
@@ -157,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
ok = add_default_binding(Q),
Q;
@@ -170,11 +172,11 @@ declare(QueueName, Durable, AutoDelete, Args) ->
end.
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(durable_queues, Q, write),
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok.
start_queue_process(Q) ->
@@ -188,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) ->
ok.
lookup(Name) ->
- rabbit_misc:dirty_read({amqqueue, Name}).
+ rabbit_misc:dirty_read({rabbit_queue, Name}).
with(Name, F, E) ->
case lookup(Name) of
@@ -205,15 +207,16 @@ with_or_die(Name, F) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_queue,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server:call(QPid, info).
+ gen_server2:pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server:call(QPid, {info, Items}) of
+ case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -222,82 +225,91 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)).
+ lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server:call(QPid, {delete, IfUnused, IfEmpty}).
+ gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server:call(QPid, {deliver_immediately, Txn, Message});
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server:call(QPid, {deliver, Txn, Message}),
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
- gen_server:cast(QPid, {deliver, Txn, Message}),
+ gen_server2:cast(QPid, {deliver, Txn, Message}),
true.
redeliver(QPid, Messages) ->
- gen_server:cast(QPid, {redeliver, Messages}).
+ gen_server2:cast(QPid, {redeliver, Messages}).
requeue(QPid, MsgIds, ChPid) ->
- gen_server:cast(QPid, {requeue, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down_all(QPids, ChPid) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
- fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server:call(QPid, {claim_queue, ReaderPid}).
+ gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server:call(QPid, {basic_get, ChPid, NoAck}).
+ gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
+
+unblock(QPid, ChPid) ->
+ gen_server2:cast(QPid, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[_] ->
ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok = mnesia:delete({durable_queues, QueueName}),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
ok
end
end).
@@ -309,12 +321,12 @@ on_node_down(Node) ->
fun (QueueName, Acc) ->
ok = rabbit_exchange:delete_transient_queue_bindings(
QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
Acc
end,
ok,
qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(amqqueue),
+ <- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6282a8fb..c390b2b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
@@ -62,9 +62,10 @@
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
- is_overload_protection_active,
+ is_limit_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -85,7 +86,7 @@
%%----------------------------------------------------------------------------
start_link(Q) ->
- gen_server:start_link(?MODULE, Q, []).
+ gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
@@ -131,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
- unsent_message_count = Count}) ->
- {Result, NewActive} =
- if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
- true ->
- {ok, Active}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
- Result.
+is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
+ end.
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
@@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
+ true ->
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
+ false ->
+ store_ch_record(C#cr{is_limit_active = true}),
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
{empty, _} ->
- not_offered
+ {not_offered, State}
end.
attempt_delivery(none, Message, State) ->
@@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) ->
persist_message(none, qname(State), Message),
persist_delivery(qname(State), Message, false),
{true, State1};
- not_offered ->
- {false, State}
+ {not_offered, State1} ->
+ {false, State1}
end;
attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
@@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) ->
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
run_poke_burst(BufferTail, NewState);
- not_offered ->
- State#q{message_buffer = MessageBuffer}
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
end;
{empty, _} ->
State#q{message_buffer = MessageBuffer}
@@ -500,8 +514,8 @@ i(messages_uncommitted, _) ->
#tx{pending_messages = Pending} <- all_tx_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
+ messages_unacknowledged,
+ messages_uncommitted]]);
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
@@ -552,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) ->
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
handle_ch_down(ChPid, State);
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -586,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -601,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
+ if Consumers == [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
if
@@ -622,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers} ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
+ if NewConsumers == [] ->
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ true ->
+ ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
case check_auto_delete(
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -730,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
+
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end));
+
+handle_cast({limit, ChPid, LimiterPid}, State) ->
+ noreply(
+ possibly_unblock(
+ State, ChPid,
+ fun (C = #cr{consumers = Consumers,
+ limiter_pid = OldLimiterPid,
+ is_limit_active = Limited}) ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ NewLimited = Limited andalso LimiterPid =/= undefined,
+ C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -758,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]);
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5fd9a512..b2716ec4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,23 +33,29 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/4, do/2, do/3, shutdown/1]).
+-behaviour(gen_server2).
+
+-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
-%% callbacks
--export([init/2, handle_message/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, proxy_pid, reader_pid, writer_pid,
+-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
+-define(HIBERNATE_AFTER, 1000).
+
+-define(MAX_PERMISSION_CACHE_SIZE, 12).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/5 ::
+ (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -61,112 +67,126 @@
%%----------------------------------------------------------------------------
-start_link(ReaderPid, WriterPid, Username, VHost) ->
- buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid,
- Username, VHost]).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+ {ok, Pid} = gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost], []),
+ Pid.
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- Pid ! {method, Method, Content},
- ok.
+ gen_server2:cast(Pid, {method, Method, Content}).
shutdown(Pid) ->
- Pid ! terminate,
- ok.
+ gen_server2:cast(Pid, terminate).
send_command(Pid, Msg) ->
- Pid ! {command, Msg},
- ok.
+ gen_server2:cast(Pid, {command, Msg}).
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- Pid ! {deliver, ConsumerTag, AckRequired, Msg},
- ok.
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
- ok.
+ gen_server2:cast(Pid, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
-init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
- %% this is bypassing the proxy so alarms can "jump the queue" and
- %% be handled promptly
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- #ch{state = starting,
- proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}.
-
-handle_message({method, Method, Content}, State) ->
+ {ok, #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new()}}.
+
+handle_call(_Request, _From, State) ->
+ noreply(State).
+
+handle_cast({method, Method, Content}, State) ->
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
+ noreply(NewState);
{noreply, NewState} ->
- NewState;
+ noreply(NewState);
stop ->
- exit(normal)
+ {stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
- terminate({amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State);
+ ok = notify_queues(internal_rollback(State)),
+ Reason = {amqp, Error, Explanation,
+ rabbit_misc:method_record_type(Method)},
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State#ch{state = terminating}};
exit:normal ->
- terminate(normal, State);
+ {stop, normal, State};
_:Reason ->
- terminate({Reason, erlang:get_stacktrace()}, State)
+ {stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_message(terminate, State) ->
- terminate(normal, State);
+handle_cast(terminate, State) ->
+ {stop, normal, State};
-handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- State;
+ noreply(State);
-handle_message({deliver, ConsumerTag, AckRequired, Msg},
- State = #ch{proxy_pid = ProxyPid,
- writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
- ok = internal_deliver(WriterPid, ProxyPid,
- true, ConsumerTag, DeliveryTag, Msg),
- State1#ch{next_tag = DeliveryTag + 1};
+ ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_message({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
+ ok = clear_permission_cache(),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- State;
+ noreply(State).
-handle_message({'EXIT', _Pid, Reason}, State) ->
- terminate(Reason, State);
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
-handle_message(Other, State) ->
- terminate({unexpected_channel_message, Other}, State).
+handle_info(timeout, State) ->
+ ok = clear_permission_cache(),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
-%%---------------------------------------------------------------------------
+terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
+ state = terminating}) ->
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid);
-terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
+terminate(Reason, State = #ch{writer_pid = WriterPid,
+ limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
case Reason of
normal -> ok = Res;
_ -> ok
end,
rabbit_writer:shutdown(WriterPid),
- exit(Reason).
+ rabbit_limiter:shutdown(LimiterPid).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -190,6 +210,35 @@ return_queue_declare_ok(State, NoWait, Q) ->
{reply, Reply, NewState}
end.
+check_resource_access(Username, Resource, Perm) ->
+ V = {Resource, Perm},
+ Cache = case get(permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+ CacheTail =
+ case lists:member(V, Cache) of
+ true -> lists:delete(V, Cache);
+ false -> ok = rabbit_access_control:check_resource_access(
+ Username, Resource, Perm),
+ lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
+ end,
+ put(permission_cache, [V | CacheTail]),
+ ok.
+
+clear_permission_cache() ->
+ erase(permission_cache),
+ ok.
+
+check_configure_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.configure).
+
+check_write_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.write).
+
+check_read_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.read).
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -248,7 +297,6 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = notify_queues(internal_rollback(State)),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
- ok = rabbit_writer:shutdown(WriterPid),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -260,6 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{ virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
@@ -273,7 +322,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -286,9 +335,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
+ Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -299,12 +349,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid,
+ _, State = #ch{ writer_pid = WriterPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -330,12 +381,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
+ _, State = #ch{ reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binstring_guid("amq.ctag");
@@ -349,7 +401,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -380,8 +432,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping }) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -402,7 +453,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% cancel_ok ourselves it might overtake a
%% message sent previously by the queue.
rabbit_amqqueue:basic_cancel(
- Q, ProxyPid, ConsumerTag,
+ Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
consumer_tag = ConsumerTag}))
end) of
@@ -414,13 +465,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
- {reply, #'basic.qos_ok'{}, State};
+handle_method(#'basic.qos'{global = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "global=true", []);
+
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "prefetch_size!=0 (~w)", [Size]);
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{ limiter_pid = LimiterPid }) ->
+ NewLimiterPid = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
+ LPid = rabbit_limiter:start_link(self()),
+ ok = limit_queues(LPid, State),
+ LPid;
+ {_, 0} ->
+ ok = rabbit_limiter:shutdown(LimiterPid),
+ ok = limit_queues(undefined, State),
+ undefined;
+ {_, _} ->
+ LimiterPid
+ end,
+ ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -429,14 +501,13 @@ handle_method(#'basic.recover'{requeue = true},
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), ProxyPid)
+ QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
lists:foreach(
@@ -454,8 +525,7 @@ handle_method(#'basic.recover'{requeue = false},
%%
%% FIXME: should we allocate a fresh DeliveryTag?
ok = internal_deliver(
- WriterPid, ProxyPid,
- false, ConsumerTag, DeliveryTag,
+ WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
%% No answer required, apparently!
@@ -476,6 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{ virtual_host = VHostPath }) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -495,6 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = rabbit_exchange:lookup_or_die(ExchangeName),
ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -504,6 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch { virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
rabbit_misc:protocol_error(
@@ -554,9 +627,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
Finish(rabbit_amqqueue:declare(QueueName,
Durable, AutoDelete, Args));
- Other -> Other
+ Other = #amqqueue{name = QueueName} ->
+ check_configure_permitted(QueueName, State),
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
@@ -565,6 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ check_configure_permitted(QueueName, State),
Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
@@ -575,6 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
@@ -611,6 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
@@ -660,9 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_write_permitted(QueueName, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, exchange_not_found} ->
rabbit_misc:protocol_error(
@@ -748,10 +829,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
State#ch{tx_participants = sets:union(Participants,
sets:from_list(MoreP))}.
-ack(ProxyPid, TxnKey, UAQ) ->
+ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid),
+ ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
[QPid | L]
end, [], UAQ).
@@ -766,7 +847,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter_pid,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -803,19 +886,37 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end],
- ProxyPid).
+notify_queues(#ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+
+limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+
+consumer_queues(Consumers) ->
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end].
+
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get (identified by their
+%% 'none' consumer tag)
+notify_limiter(undefined, _Acked) ->
+ ok;
+notify_limiter(LimiterPid, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(LimiterPid, Count)
+ end.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
@@ -823,7 +924,8 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n",
+ Other -> rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
[Other]),
false
end.
@@ -833,7 +935,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
+internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
{_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -845,6 +947,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
routing_key = RoutingKey},
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, ChPid, M, Content);
+ WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 352d7e75..6649899a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -127,10 +127,10 @@ Available commands:
delete_vhost <VHostPath>
list_vhosts
- map_user_vhost <UserName> <VHostPath>
- unmap_user_vhost <UserName> <VHostPath>
- list_user_vhosts <UserName>
- list_vhost_users <VHostPath>
+ set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
+ clear_permissions [-p <VHostPath>] <UserName>
+ list_permissions [-p <VHostPath>]
+ list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
@@ -236,25 +236,14 @@ action(list_vhosts, Node, [], Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) ->
- Inform("Mapping user ~p to vhost ~p", Args),
- call(Node, {rabbit_access_control, map_user_vhost, Args});
-
-action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) ->
- Inform("Unmapping user ~p from vhost ~p", Args),
- call(Node, {rabbit_access_control, unmap_user_vhost, Args});
-
-action(list_user_vhosts, Node, Args = [_Username], Inform) ->
- Inform("Listing vhosts for user ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args}));
-
-action(list_vhost_users, Node, Args = [_VHostPath], Inform) ->
- Inform("Listing users for vhosts ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_vhost_users, Args}));
+action(list_user_permissions, Node, Args = [_Username], Inform) ->
+ Inform("Listing permissions for user ~p", Args),
+ display_list(call(Node, {rabbit_access_control, list_user_permissions,
+ Args}));
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag(Args),
+ {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
ArgAtoms = list_replace(node, pid,
default_if_empty(RemainingArgs, [name, messages])),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
@@ -263,7 +252,7 @@ action(list_queues, Node, Args, Inform) ->
action(list_exchanges, Node, Args, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag(Args),
+ {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
@@ -271,7 +260,7 @@ action(list_exchanges, Node, Args, Inform) ->
action(list_bindings, Node, Args, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag(Args),
+ {VHostArg, _} = parse_vhost_flag_bin(Args),
InfoKeys = [exchange_name, routing_key, queue_name, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
@@ -285,15 +274,37 @@ action(list_connections, Node, Args, Inform) ->
default_if_empty(Args, [user, peer_address, peer_port])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
- ArgAtoms).
+ ArgAtoms);
+
+action(Command, Node, Args, Inform) ->
+ {VHost, RemainingArgs} = parse_vhost_flag(Args),
+ action(Command, Node, VHost, RemainingArgs, Inform).
+
+action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
+ Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
+ call(Node, {rabbit_access_control, set_permissions,
+ [Username, VHost, CPerm, WPerm, RPerm]});
+
+action(clear_permissions, Node, VHost, [Username], Inform) ->
+ Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
+ call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
+
+action(list_permissions, Node, VHost, [], Inform) ->
+ Inform("Listing permissions in vhost ~p", [VHost]),
+ display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
+ [VHost]})).
parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {list_to_binary(VHost), RemainingArgs};
- RemainingArgs ->
- {<<"/">>, RemainingArgs}
- end.
+ case Args of
+ ["-p", VHost | RemainingArgs] ->
+ {VHost, RemainingArgs};
+ RemainingArgs ->
+ {"/", RemainingArgs}
+ end.
+
+parse_vhost_flag_bin(Args) ->
+ {VHost, RemainingArgs} = parse_vhost_flag(Args),
+ {list_to_binary(VHost), RemainingArgs}.
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
@@ -303,21 +314,17 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(
- fun (Result) ->
- io:fwrite(
- lists:flatten(
- rabbit_misc:intersperse(
- "\t",
- [format_info_item(Result, X) || X <- InfoItemKeys]))),
- io:nl()
- end,
- Results),
+ lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
+ X <- InfoItemKeys])
+ end, Results),
ok;
-
display_info_list(Other, _) ->
Other.
+display_row(Row) ->
+ io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
+ io:nl().
+
format_info_item(Items, Key) ->
{value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
case Info of
@@ -334,8 +341,10 @@ format_info_item(Items, Key) ->
end.
display_list(L) when is_list(L) ->
- lists:foreach(fun (I) ->
- io:format("~s~n", [binary_to_list(I)])
+ lists:foreach(fun (I) when is_binary(I) ->
+ io:format("~s~n", [url_encode(I)]);
+ (I) when is_tuple(I) ->
+ display_row([url_encode(V) || V <- tuple_to_list(I)])
end,
lists:sort(L)),
ok;
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 9a9220b5..183b6984 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -46,7 +46,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of " ++
+ rabbit_log:error("Failed to append contents of "
"log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7f3a78e9..fc89cfca 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,11 +37,11 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
- route/2]).
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2, topic_matches/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -79,7 +79,7 @@
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -91,6 +91,7 @@
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -106,14 +107,16 @@
recover() ->
ok = rabbit_misc:table_foreach(
- fun(Exchange) -> ok = mnesia:write(Exchange) end,
- durable_exchanges),
+ fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
+ Exchange, write)
+ end, rabbit_durable_exchange),
ok = rabbit_misc:table_foreach(
fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute)
- end, durable_routes),
- ok.
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write)
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -123,11 +126,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({exchange, ExchangeName}) of
- [] -> ok = mnesia:write(Exchange),
+ case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
if Durable ->
- ok = mnesia:write(
- durable_exchanges, Exchange, write);
+ ok = mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
true -> ok
end,
Exchange;
@@ -141,6 +144,8 @@ check_type(<<"direct">>) ->
direct;
check_type(<<"topic">>) ->
topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
rabbit_misc:protocol_error(
command_invalid, "invalid exchange type '~s'", [T]).
@@ -154,7 +159,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
[rabbit_misc:rs(Name), ActualType, RequiredType]).
lookup(Name) ->
- rabbit_misc:dirty_read({exchange, Name}).
+ rabbit_misc:dirty_read({rabbit_exchange, Name}).
lookup_or_die(Name) ->
case lookup(Name) of
@@ -166,6 +171,7 @@ lookup_or_die(Name) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
map(VHostPath, F) ->
@@ -207,64 +213,80 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey}) ->
+ routing_key = RoutingKey,
+ content = Content}) ->
case lookup(ExchangeName) of
{ok, Exchange} ->
- QPids = route(Exchange, RoutingKey),
+ QPids = route(Exchange, RoutingKey, Content),
rabbit_router:deliver(QPids, Mandatory, Immediate,
none, Message);
{error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%%
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
%% TODO: Maybe this should be handled by a cursor instead.
-route(#exchange{name = Name, type = topic}, RoutingKey) ->
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Name,
- %% TODO: This causes a full scan for each entry
- %% with the same exchange (see bug 19336)
- topic_matches(BindingKey, RoutingKey)]),
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
lookup_qpids(
try
mnesia:async_dirty(fun qlc:e/1, [Query])
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- key = BindingKey}} <-
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
mnesia:dirty_match_object(
+ rabbit_route,
#route{binding = #binding{exchange_name = Name,
_ = '_'}}),
- topic_matches(BindingKey, RoutingKey)]
- end);
-
-route(X = #exchange{type = fanout}, _) ->
- route_internal(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey) ->
- route_internal(X, RoutingKey).
+ Match(Binding)]
+ end).
-route_internal(#exchange{name = Name}, RoutingKey) ->
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
queue_name = '$1',
key = RoutingKey,
_ = '_'}},
- lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
sets:fold(
fun(Key, Acc) ->
- case mnesia:dirty_read({amqqueue, Key}) of
+ case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
@@ -275,10 +297,16 @@ lookup_qpids(Queues) ->
%% to be implemented for 0.91 ?
delete_exchange_bindings(ExchangeName) ->
- indexed_delete(
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- fun delete_forward_routes/1, fun mnesia:delete_object/1).
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route)
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ write)],
+ ok.
delete_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_forward_routes/1).
@@ -288,29 +316,27 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
- indexed_delete(
- reverse_route(#route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- fun mnesia:delete_object/1, FwdDeleteFun),
[begin
- [X] = mnesia:read({exchange, ExchangeName}),
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
ok = maybe_auto_delete(X)
end || ExchangeName <- Exchanges],
ok.
-indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) ->
- [begin
- ok = ReverseDeleteFun(reverse_route(Route)),
- ok = ForwardsDeleteFun(Route)
- end || Route <- mnesia:match_object(Match)],
- ok.
-
delete_forward_routes(Route) ->
- ok = mnesia:delete_object(Route),
- ok = mnesia:delete_object(durable_routes, Route, write).
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(Route).
+ ok = mnesia:delete_object(rabbit_route, Route, write).
exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
@@ -319,7 +345,7 @@ exchanges_for_queue(QueueName) ->
_ = '_'}}),
sets:to_list(
sets:from_list(
- mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
contains(Table, MatchHead) ->
try
@@ -339,7 +365,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case mnesia:read({exchange, Exchange}) of
+ fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
@@ -347,8 +373,8 @@ call_with_exchange(Exchange, Fun) ->
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case {mnesia:read({exchange, Exchange}),
- mnesia:read({amqqueue, Queue})} of
+ fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
{[X], [Q]} -> Fun(X, Q);
{[ ], [_]} -> {error, exchange_not_found};
{[_], [ ]} -> {error, queue_not_found};
@@ -382,13 +408,15 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = Arguments},
+ args = sort_arguments(Arguments)},
ok = case Durable of
- true -> Fun(durable_routes, #route{binding = Binding}, write);
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
false -> ok
end,
- [ok, ok] = [Fun(element(1, R), R, write) ||
- R <- tuple_to_list(route_with_reverse(Binding))],
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
list_bindings(VHostPath) ->
@@ -399,6 +427,7 @@ list_bindings(VHostPath) ->
queue_name = QueueName,
args = Arguments}}
<- mnesia:dirty_match_object(
+ rabbit_route,
#route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
_ = '_'},
@@ -434,6 +463,67 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and sync_binding/6 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.
@@ -477,15 +567,15 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- case contains(route, Match) orelse contains(durable_routes, Match) of
+ case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
- ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API
@@ -501,7 +591,7 @@ list_exchange_bindings(ExchangeName) ->
#route{binding = #binding{queue_name = QueueName,
key = RoutingKey,
args = Arguments}}
- <- mnesia:dirty_match_object(Route)].
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
% Refactoring is left as an exercise for the reader
list_queue_bindings(QueueName) ->
@@ -511,4 +601,4 @@ list_queue_bindings(QueueName) ->
#route{binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
args = Arguments}}
- <- mnesia:dirty_match_object(Route)].
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 060bed48..5c447792 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -95,13 +95,15 @@ collect_content(ChannelPid, MethodName) ->
true ->
rabbit_misc:protocol_error(
command_invalid,
- "expected content header for class ~w, got one for class ~w instead",
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
[ClassId, HeaderClassId])
end;
_ ->
rabbit_misc:protocol_error(
command_invalid,
- "expected content header for class ~w, got non content header frame instead",
+ "expected content header for class ~w, "
+ "got non content header frame instead",
[ClassId])
end.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 51c1665b..2be00503 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -82,7 +82,8 @@ guid() ->
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
G = case get(guid) of
- undefined -> {{gen_server:call(?SERVER, serial), self()}, 0};
+ undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
+ 0};
{S, I} -> {S, I+1}
end,
put(guid, G),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
new file mode 100644
index 00000000..3f9b6ebb
--- /dev/null
+++ b/src/rabbit_limiter.erl
@@ -0,0 +1,195 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_limiter).
+
+-behaviour(gen_server).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([start_link/1, shutdown/1]).
+-export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(maybe_pid() :: pid() | 'undefined').
+
+-spec(start_link/1 :: (pid()) -> pid()).
+-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-record(lim, {prefetch_count = 0,
+ ch_pid,
+ queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ volume = 0}).
+%% 'Notify' is a boolean that indicates whether a queue should be
+%% notified of a change in the limit or volume that may allow it to
+%% deliver more messages via the limiter's channel.
+
+%%----------------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------------
+
+start_link(ChPid) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ Pid.
+
+shutdown(undefined) ->
+ ok;
+shutdown(LimiterPid) ->
+ unlink(LimiterPid),
+ gen_server2:cast(LimiterPid, shutdown).
+
+limit(undefined, 0) ->
+ ok;
+limit(LimiterPid, PrefetchCount) ->
+ gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+
+%% Ask the limiter whether the queue can deliver a message without
+%% breaching a limit
+can_send(undefined, _QPid) ->
+ true;
+can_send(LimiterPid, QPid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> true end,
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+
+%% Let the limiter know that the channel has received some acks from a
+%% consumer
+ack(undefined, _Count) -> ok;
+ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}).
+
+register(undefined, _QPid) -> ok;
+register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
+
+unregister(undefined, _QPid) -> ok;
+unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([ChPid]) ->
+ {ok, #lim{ch_pid = ChPid} }.
+
+handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+ case limit_reached(State) of
+ true -> {reply, false, limit_queue(QPid, State)};
+ false -> {reply, true, State#lim{volume = Volume + 1}}
+ end.
+
+handle_cast(shutdown, State) ->
+ {stop, normal, State};
+
+handle_cast({limit, PrefetchCount}, State) ->
+ {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
+
+handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
+ NewVolume = if Volume == 0 -> 0;
+ true -> Volume - Count
+ end,
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+
+handle_cast({register, QPid}, State) ->
+ {noreply, remember_queue(QPid, State)};
+
+handle_cast({unregister, QPid}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+terminate(_, _) ->
+ ok.
+
+code_change(_, State, _) ->
+ State.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing
+%%----------------------------------------------------------------------------
+
+maybe_notify(OldState, NewState) ->
+ case limit_reached(OldState) andalso not(limit_reached(NewState)) of
+ true -> notify_queues(NewState);
+ false -> NewState
+ end.
+
+limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+ Limit =/= 0 andalso Volume >= Limit.
+
+remember_queue(QPid, State = #lim{queues = Queues}) ->
+ case dict:is_key(QPid, Queues) of
+ false -> MRef = erlang:monitor(process, QPid),
+ State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ true -> State
+ end.
+
+forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ case dict:find(QPid, Queues) of
+ {ok, {MRef, _}} ->
+ true = erlang:demonitor(MRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = dict:erase(QPid, Queues)};
+ error -> State
+ end.
+
+limit_queue(QPid, State = #lim{queues = Queues}) ->
+ UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
+ State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+
+notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ {QList, NewQueues} =
+ dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], dict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
+ case length(QList) of
+ 0 -> ok;
+ L ->
+ %% We randomly vary the position of queues in the list,
+ %% thus ensuring that each queue has an equal chance of
+ %% being notified first.
+ {L1, L2} = lists:split(random:uniform(L), QList),
+ [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
+ ok
+ end,
+ State#lim{queues = NewQueues}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1fcd9a61..eced0b3c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,6 +50,7 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
+-export([start_applications/1, stop_applications/1]).
-import(mnesia).
-import(lists).
@@ -106,6 +107,8 @@
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
+-spec(start_applications/1 :: ([atom()]) -> 'ok').
+-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-endif.
@@ -232,7 +235,7 @@ filter_exit_map(F, L) ->
with_user(Username, Thunk) ->
fun () ->
- case mnesia:read({user, Username}) of
+ case mnesia:read({rabbit_user, Username}) of
[] ->
mnesia:abort({no_such_user, Username});
[_U] ->
@@ -242,7 +245,7 @@ with_user(Username, Thunk) ->
with_vhost(VHostPath, Thunk) ->
fun () ->
- case mnesia:read({vhost, VHostPath}) of
+ case mnesia:read({rabbit_vhost, VHostPath}) of
[] ->
mnesia:abort({no_such_vhost, VHostPath});
[_V] ->
@@ -385,3 +388,32 @@ format_stderr(Fmt, Args) ->
io:format(Fmt, Args)
end,
ok.
+
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 0c573073..575ecb0a 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -101,33 +101,50 @@ force_reset() -> reset(true).
%%--------------------------------------------------------------------
table_definitions() ->
- [{user, [{disc_copies, [node()]},
- {attributes, record_info(fields, user)}]},
- {user_vhost, [{type, bag},
- {disc_copies, [node()]},
- {attributes, record_info(fields, user_vhost)},
- {index, [virtual_host]}]},
- {vhost, [{disc_copies, [node()]},
- {attributes, record_info(fields, vhost)}]},
- {rabbit_config, [{disc_copies, [node()]}]},
- {listener, [{type, bag},
- {attributes, record_info(fields, listener)}]},
- {durable_routes, [{disc_copies, [node()]},
- {record_name, route},
- {attributes, record_info(fields, route)}]},
- {route, [{type, ordered_set},
- {attributes, record_info(fields, route)}]},
- {reverse_route, [{type, ordered_set},
- {attributes, record_info(fields, reverse_route)}]},
- {durable_exchanges, [{disc_copies, [node()]},
- {record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
- {exchange, [{attributes, record_info(fields, exchange)}]},
- {durable_queues, [{disc_copies, [node()]},
- {record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]},
- {amqqueue, [{attributes, record_info(fields, amqqueue)},
- {index, [pid]}]}].
+ [{rabbit_user,
+ [{record_name, user},
+ {attributes, record_info(fields, user)},
+ {disc_copies, [node()]}]},
+ {rabbit_user_permission,
+ [{record_name, user_permission},
+ {attributes, record_info(fields, user_permission)},
+ {disc_copies, [node()]}]},
+ {rabbit_vhost,
+ [{record_name, vhost},
+ {attributes, record_info(fields, vhost)},
+ {disc_copies, [node()]}]},
+ {rabbit_config,
+ [{disc_copies, [node()]}]},
+ {rabbit_listener,
+ [{record_name, listener},
+ {attributes, record_info(fields, listener)},
+ {type, bag}]},
+ {rabbit_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {disc_copies, [node()]}]},
+ {rabbit_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set}]},
+ {rabbit_reverse_route,
+ [{record_name, reverse_route},
+ {attributes, record_info(fields, reverse_route)},
+ {type, ordered_set}]},
+ {rabbit_durable_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {disc_copies, [node()]}]},
+ {rabbit_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)}]},
+ {rabbit_durable_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)},
+ {disc_copies, [node()]}]},
+ {rabbit_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)}]}].
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
@@ -246,8 +263,8 @@ init_db(ClusterNodes) ->
%% NB: we cannot use rabbit_log here since
%% it may not have been started yet
error_logger:warning_msg(
- "schema integrity check failed: ~p~n" ++
- "moving database to backup location " ++
+ "schema integrity check failed: ~p~n"
+ "moving database to backup location "
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 99ea37d8..2dbd5a5a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -123,6 +123,7 @@ stop_tcp_listener(Host, Port) ->
tcp_listener_started(IPAddress, Port) ->
ok = mnesia:dirty_write(
+ rabbit_listener,
#listener{node = node(),
protocol = tcp,
host = tcp_host(IPAddress),
@@ -130,19 +131,20 @@ tcp_listener_started(IPAddress, Port) ->
tcp_listener_stopped(IPAddress, Port) ->
ok = mnesia:dirty_delete_object(
+ rabbit_listener,
#listener{node = node(),
protocol = tcp,
host = tcp_host(IPAddress),
port = Port}).
active_listeners() ->
- rabbit_misc:dirty_read_all(listener).
+ rabbit_misc:dirty_read_all(rabbit_listener).
node_listeners(Node) ->
- mnesia:dirty_read(listener, Node).
+ mnesia:dirty_read(rabbit_listener, Node).
on_node_down(Node) ->
- ok = mnesia:dirty_delete(listener, Node).
+ ok = mnesia:dirty_delete(rabbit_listener, Node).
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 6e332484..d0d60ddf 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -49,6 +49,8 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
+-define(HIBERNATE_AFTER, 10000).
+
-define(MAX_WRAP_ENTRIES, 500).
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
@@ -93,7 +95,7 @@ start_link() ->
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}).
+ gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
extend_transaction(TxnKey, MessageList) ->
?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
@@ -105,17 +107,17 @@ dirty_work(MessageList) ->
commit_transaction(TxnKey) ->
?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}).
+ gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
rollback_transaction(TxnKey) ->
?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot).
+ gen_server:call(?SERVER, force_snapshot, infinity).
serial() ->
- gen_server:call(?SERVER, serial).
+ gen_server:call(?SERVER, serial, infinity).
%%--------------------------------------------------------------------
@@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) ->
do_noreply(internal_commit(From, Key, NewState));
handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State = #pstate{log_handle = LH,
- snapshot = Snapshot}) ->
- ok = take_snapshot(LH, Snapshot),
- do_reply(ok, State);
+handle_call(force_snapshot, _From, State) ->
+ do_reply(ok, flush(true, State));
handle_call(serial, _From,
State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
do_reply(Serial, State);
@@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+handle_info(timeout, State = #pstate{deadline = infinity}) ->
+ State1 = flush(true, State),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State1, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(timeout, State) ->
- {noreply, flush(State)};
+ do_noreply(flush(State));
handle_info(_Info, State) ->
{noreply, State}.
@@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH,
- snapshot = Snapshot})
- when EntryCount >= ?MAX_WRAP_ENTRIES ->
+maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
+ log_handle = LH,
+ snapshot = Snapshot})
+ when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
ok = take_snapshot(LH, Snapshot),
State#pstate{entry_count = 0};
-maybe_take_snapshot(State) ->
+maybe_take_snapshot(_Force, State) ->
State.
later_ms(DeltaMilliSec) ->
@@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- infinity;
+ ?HIBERNATE_AFTER;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) ->
do_reply(Reply, State = #pstate{deadline = Deadline}) ->
{reply, Reply, State, compute_timeout(Deadline)}.
-flush(State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if
- PendingLogs /= [] ->
+flush(State) -> flush(false, State).
+
+flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
+ pending_replies = Waiting,
+ log_handle = LogHandle}) ->
+ State1 = if PendingLogs /= [] ->
disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- maybe_take_snapshot(
- State#pstate{
- entry_count = State#pstate.entry_count + 1});
- true ->
+ State#pstate{entry_count = State#pstate.entry_count + 1};
+ true ->
State
end,
+ State2 = maybe_take_snapshot(ForceSnapshot, State1),
if Waiting /= [] ->
ok = disk_log:sync(LogHandle),
lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
@@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs,
true ->
ok
end,
- State1#pstate{deadline = infinity,
+ State2#pstate{deadline = infinity,
pending_logs = [],
pending_replies = []}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3f8d7cac..ef8038e7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
info(Pid) ->
- gen_server:call(Pid, info).
+ gen_server:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}) of
+ case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -173,7 +173,8 @@ setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
once ->
- rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"),
+ rabbit_log:info("Enabling profiling for this connection, "
+ "and disabling for subsequent.~n"),
rabbit_misc:set_config(profiling_enabled, false),
fprof:trace(start);
true ->
@@ -230,8 +231,12 @@ start_connection(Parent, Deb, ClientSock) ->
connection_state = pre_init},
handshake, 8))
catch
- Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> (if Ex == connection_closed_abruptly ->
+ fun rabbit_log:warning/2;
+ true ->
+ fun rabbit_log:error/2
+ end)("exception on TCP connection ~p from ~s:~p~n~p~n",
+ [self(), PeerAddressS, PeerPort, Ex])
after
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
@@ -283,6 +288,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
throw(E);
+ {channel_exit, Channel, Reason} ->
+ mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -350,6 +357,14 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
+handle_channel_exit(Channel, Reason, State) ->
+ %% We remove the channel from the inbound map only. That allows
+ %% the channel to be re-opened, but also means the remaining
+ %% cleanup, including possibly closing the connection, is deferred
+ %% until we get the (normal) exit signal.
+ erase({channel, Channel}),
+ handle_exception(State, Channel, Reason).
+
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
maybe_close(State);
@@ -404,7 +419,8 @@ wait_for_channel_termination(N, TimerRef) ->
normal -> ok;
_ ->
rabbit_log:error(
- "connection ~p, channel ~p - error while terminating:~n~p~n",
+ "connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
[self(), Channel, Reason])
end,
wait_for_channel_termination(N-1, TimerRef)
@@ -709,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/4,
- [self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ad653a2f..0b06a063 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -32,7 +32,7 @@
-module(rabbit_router).
-include("rabbit.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([start_link/0,
deliver/5]).
@@ -58,7 +58,7 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-ifdef(BUG19758).
@@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
%% than the non-immediate case below.
{ok, lists:flatmap(
fun ({Node, QPids}) ->
- gen_server:cast(
+ gen_server2:cast(
{?SERVER, Node},
{deliver, QPids, Mandatory, Immediate, Txn, Message}),
QPids
@@ -110,9 +110,10 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
Txn, Message) ->
R = rabbit_misc:upmap(
fun ({Node, QPids}) ->
- try gen_server:call(
+ try gen_server2:call(
{?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message})
+ {deliver, QPids, Mandatory, Immediate, Txn, Message},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here
@@ -143,7 +144,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
spawn(
fun () ->
R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
- gen_server:reply(From, R)
+ gen_server2:reply(From, R)
end),
{noreply, State}.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 9e4c9c8a..2a365ce1 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -47,7 +47,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of " ++
+ rabbit_log:error("Failed to append contents of "
"sasl log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6706ecd1..8f0a3a89 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = test_priority_queue(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -55,6 +56,62 @@ all_tests() ->
passed = test_server_status(),
passed.
+test_priority_queue() ->
+
+ false = priority_queue:is_queue(not_a_queue),
+
+ %% empty Q
+ Q = priority_queue:new(),
+ {true, true, 0, [], []} = test_priority_queue(Q),
+
+ %% 1-4 element no-priority Q
+ true = lists:all(fun (X) -> X =:= passed end,
+ lists:map(fun test_simple_n_element_queue/1,
+ lists:seq(1, 4))),
+
+ %% 1-element priority Q
+ Q1 = priority_queue:in(foo, 1, priority_queue:new()),
+ {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+
+ %% 2-element same-priority Q
+ Q2 = priority_queue:in(bar, 1, Q1),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q2),
+
+ %% 2-element different-priority Q
+ Q3 = priority_queue:in(bar, 2, Q1),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q3),
+
+ %% 1-element negative priority Q
+ Q4 = priority_queue:in(foo, -1, priority_queue:new()),
+ {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+
+ passed.
+
+priority_queue_in_all(Q, L) ->
+ lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L).
+
+priority_queue_out_all(Q) ->
+ case priority_queue:out(Q) of
+ {empty, _} -> [];
+ {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
+ end.
+
+test_priority_queue(Q) ->
+ {priority_queue:is_queue(Q),
+ priority_queue:is_empty(Q),
+ priority_queue:len(Q),
+ priority_queue:to_list(Q),
+ priority_queue_out_all(Q)}.
+
+test_simple_n_element_queue(N) ->
+ Items = lists:seq(1, N),
+ Q = priority_queue_in_all(priority_queue:new(), Items),
+ ToListRes = [{0, X} || X <- Items],
+ {true, false, N, ToListRes, Items} = test_priority_queue(Q),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -450,17 +507,16 @@ test_user_management() ->
{error, {no_such_vhost, _}} =
control_action(delete_vhost, ["/testhost"]),
{error, {no_such_user, _}} =
- control_action(map_user_vhost, ["foo", "/"]),
+ control_action(set_permissions, ["foo", ".*", ".*", ".*"]),
{error, {no_such_user, _}} =
- control_action(unmap_user_vhost, ["foo", "/"]),
+ control_action(clear_permissions, ["foo"]),
{error, {no_such_user, _}} =
- control_action(list_user_vhosts, ["foo"]),
- {error, {no_such_vhost, _}} =
- control_action(map_user_vhost, ["guest", "/testhost"]),
+ control_action(list_user_permissions, ["foo"]),
{error, {no_such_vhost, _}} =
- control_action(unmap_user_vhost, ["guest", "/testhost"]),
- {error, {no_such_vhost, _}} =
- control_action(list_vhost_users, ["/testhost"]),
+ control_action(list_permissions, ["-p", "/testhost"]),
+ {error, {invalid_regexp, _, _}} =
+ control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
+
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
@@ -475,13 +531,16 @@ test_user_management() ->
ok = control_action(list_vhosts, []),
%% user/vhost mapping
- ok = control_action(map_user_vhost, ["foo", "/testhost"]),
- ok = control_action(map_user_vhost, ["foo", "/testhost"]),
- ok = control_action(list_user_vhosts, ["foo"]),
+ ok = control_action(set_permissions, ["-p", "/testhost",
+ "foo", ".*", ".*", ".*"]),
+ ok = control_action(set_permissions, ["-p", "/testhost",
+ "foo", ".*", ".*", ".*"]),
+ ok = control_action(list_permissions, ["-p", "/testhost"]),
+ ok = control_action(list_user_permissions, ["foo"]),
%% user/vhost unmapping
- ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
- ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
+ ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
%% vhost deletion
ok = control_action(delete_vhost, ["/testhost"]),
@@ -490,7 +549,8 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(map_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(set_permissions, ["-p", "/testhost",
+ "foo", ".*", ".*", ".*"]),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion