summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Ionescu <vlad@lshift.net>2009-06-13 12:25:29 +0100
committerVlad Ionescu <vlad@lshift.net>2009-06-13 12:25:29 +0100
commitce13e65faebc930a84147a26a7e3dabc2298c55c (patch)
tree6492621234b8958cf8bba995d6ea4dab2493a7f8
parent35863cd5d7e3b01670b828c7d656d45d5e122c0f (diff)
parent0b18bf63b8fda7961562cb4b9fb1575eb0f694ef (diff)
downloadrabbitmq-server-bug19911.tar.gz
merging bug19911 into junkbug19911
-rw-r--r--.hgignore1
-rw-r--r--Makefile24
-rw-r--r--docs/rabbitmqctl.1.pod67
-rw-r--r--ebin/rabbit.app57
-rw-r--r--ebin/rabbit_app.in21
-rw-r--r--generate_app10
-rw-r--r--include/rabbit.hrl7
-rw-r--r--packaging/windows/rabbitmq-service.pod6
-rw-r--r--src/buffering_proxy.erl108
-rw-r--r--src/rabbit.erl42
-rw-r--r--src/rabbit_access_control.erl14
-rw-r--r--src/rabbit_alarm.erl34
-rw-r--r--src/rabbit_amqqueue.erl35
-rw-r--r--src/rabbit_amqqueue_process.erl318
-rw-r--r--src/rabbit_channel.erl80
-rw-r--r--src/rabbit_control.erl93
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_exchange.erl134
-rw-r--r--src/rabbit_framing_channel.erl6
-rw-r--r--src/rabbit_misc.erl15
-rw-r--r--src/rabbit_mnesia.erl75
-rw-r--r--src/rabbit_networking.erl8
-rw-r--r--src/rabbit_reader.erl52
-rw-r--r--src/rabbit_router.erl76
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_tests.erl31
26 files changed, 705 insertions, 613 deletions
diff --git a/.hgignore b/.hgignore
index 35607765..28f9cfd8 100644
--- a/.hgignore
+++ b/.hgignore
@@ -9,7 +9,6 @@ syntax: regexp
^include/rabbit_framing.hrl$
^src/rabbit_framing.erl$
^rabbit.plt$
-^ebin/rabbit.app$
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
diff --git a/Makefile b/Makefile
index 4ff8573a..13afb9ac 100644
--- a/Makefile
+++ b/Makefile
@@ -7,8 +7,7 @@ SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
-TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
+TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
@@ -17,7 +16,7 @@ PYTHON=python
ifndef USE_SPECS
# our type specs rely on features / bug fixes in dialyzer that are
# only available in R12B-3 upwards
-#
+#
# NB: the test assumes that version number will only contain single digits
USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi)
endif
@@ -40,15 +39,9 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
-$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
- escript generate_app $(EBIN_DIR) < $< > $@
-
-$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl
erlc $(ERLC_OPTS) $<
-
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
- erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
+# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
@@ -59,12 +52,12 @@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py
$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS)
erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().'
-dialyze: $(BEAM_TARGETS)
+dialyze: $(TARGETS)
dialyzer -c $?
clean: cleandb
rm -f $(EBIN_DIR)/*.beam
- rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
+ rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
@@ -128,8 +121,13 @@ srcdist: distclean
>> $(TARGET_SRC_DIR)/INSTALL
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
+<<<<<<< /tmp/rabbitmq-server/Makefile
+ >> $(TARGET_SRC_DIR)/README
+ sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app
+=======
>> $(TARGET_SRC_DIR)/BUILD
sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
+>>>>>>> /tmp/Makefile~other.J-SLyR
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile generate_app $(TARGET_SRC_DIR)
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 42156896..d0a27a36 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -26,7 +26,7 @@ B<-n> I<node>
startup time). The output of hostname -s is usually the correct
suffix to use after the "@" sign. See rabbitmq-server(1) for
details of configuring the RabbitMQ broker.
-
+
B<-q>
quiet output mode is selected with the B<-q> flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -43,32 +43,32 @@ stop_app
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped,
e.g. I<reset>.
-
+
start_app
start the RabbitMQ application.
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped,
e.g. I<reset>.
-
+
status
display various information about the RabbitMQ broker, such as
whether the RabbitMQ application on the current node, its version
number, what nodes are part of the broker, which of these are
running.
-
+
force
return a RabbitMQ node to its virgin state.
Removes the node from any cluster it belongs to, removes all data
from the management database, such as configured users, vhosts and
deletes all persistent messages.
-
+
force_reset
the same as I<force> command, but resets the node unconditionally,
regardless of the current management database state and cluster
configuration.
It should only be used as a last resort if the database or cluster
configuration has been corrupted.
-
+
rotate_logs [suffix]
instruct the RabbitMQ node to rotate the log files. The RabbitMQ
broker will attempt to append the current contents of the log file
@@ -81,58 +81,53 @@ rotate_logs [suffix]
specified.
This command might be helpful when you are e.g. writing your own
logrotate script and you do not want to restart the RabbitMQ node.
-
+
cluster I<clusternode> ...
instruct the node to become member of a cluster with the specified
nodes determined by I<clusternode> option(s).
See http://www.rabbitmq.com/clustering.html for more information
about clustering.
-
+
=head2 USER MANAGEMENT
-
+
add_user I<username> I<password>
create a user named I<username> with (initial) password I<password>.
-
-delete_user I<username>
- delete the user named I<username>.
-
+
change_password I<username> I<newpassword>
change the password for the user named I<username> to I<newpassword>.
list_users
list all users.
-
+
=head2 ACCESS CONTROL
add_vhost I<vhostpath>
create a new virtual host called I<vhostpath>.
-
+
delete_vhost I<vhostpath>
delete a virtual host I<vhostpath>.
+<<<<<<< local
+ That command deletes also all its exchanges, queues and user mappings.
+=======
That command deletes also all its exchanges, queues and user
mappings.
+>>>>>>> other
list_vhosts
list all virtual hosts.
-
-set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp>
- set the permissions for the user named I<username> in the virtual
- host I<vhostpath>, granting 'configure', 'write' and 'read' access
- to resources with names matching the first, second and third
- I<regexp>, respectively.
-
-clear_permissions [-p I<vhostpath>] I<username>
- remove the permissions for the user named I<username> in the
- virtual host I<vhostpath>.
-
-list_permissions [-p I<vhostpath>]
- list all the users and their permissions in the virtual host
+
+map_user_vhost I<username> I<vhostpath>
+ grant the user named I<username> access to the virtual host called
+ I<vhostpath>.
+
+unmap_user_vhost I<username> I<vhostpath>
+ deny the user named I<username> access to the virtual host called
I<vhostpath>.
-list_user_permissions I<username>
- list the permissions of the user named I<username> across all
- virtual hosts.
-
+list_user_vhost I<username>
+ list all the virtual hosts to which the user named I<username> has
+ been granted access.
+
=head2 SERVER STATUS
list_queues [-p I<vhostpath>] [I<queueinfoitem> ...]
@@ -240,7 +235,7 @@ peer_address
peer_port
peer port
-
+
state
connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>,
B<running>, B<closing>, B<closed>)
@@ -274,7 +269,7 @@ send_cnt
send_pend
send queue size
-
+
=back
The list_queues, list_exchanges and list_bindings commands accept an
@@ -288,12 +283,12 @@ Create a user named foo with (initial) password bar at the Erlang node
rabbit@test:
rabbitmqctl -n rabbit@test add_user foo bar
-
+
Grant user named foo access to the virtual host called test at the
default Erlang node:
rabbitmqctl map_user_vhost foo test
-
+
Append the current logs' content to the files with ".1" suffix and reopen
them:
diff --git a/ebin/rabbit.app b/ebin/rabbit.app
new file mode 100644
index 00000000..0d714fdf
--- /dev/null
+++ b/ebin/rabbit.app
@@ -0,0 +1,57 @@
+{application, rabbit, %% -*- erlang -*-
+ [{description, "RabbitMQ"},
+ {id, "RabbitMQ"},
+ {vsn, "%%VERSION%%"},
+ {modules, [buffering_proxy,
+ rabbit_access_control,
+ rabbit_alarm,
+ rabbit_amqqueue,
+ rabbit_amqqueue_process,
+ rabbit_amqqueue_sup,
+ rabbit_binary_generator,
+ rabbit_binary_parser,
+ rabbit_channel,
+ rabbit_control,
+ rabbit,
+ rabbit_error_logger,
+ rabbit_error_logger_file_h,
+ rabbit_exchange,
+ rabbit_framing_channel,
+ rabbit_framing,
+ rabbit_heartbeat,
+ rabbit_load,
+ rabbit_log,
+ rabbit_memsup_linux,
+ rabbit_misc,
+ rabbit_mnesia,
+ rabbit_multi,
+ rabbit_networking,
+ rabbit_node_monitor,
+ rabbit_persister,
+ rabbit_reader,
+ rabbit_router,
+ rabbit_sasl_report_file_h,
+ rabbit_sup,
+ rabbit_tests,
+ rabbit_tracer,
+ rabbit_writer,
+ tcp_acceptor,
+ tcp_acceptor_sup,
+ tcp_client_sup,
+ tcp_listener,
+ tcp_listener_sup]},
+ {registered, [rabbit_amqqueue_sup,
+ rabbit_log,
+ rabbit_node_monitor,
+ rabbit_persister,
+ rabbit_router,
+ rabbit_sup,
+ rabbit_tcp_client_sup]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {mod, {rabbit, []}},
+ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
+ {extra_startup_steps, []},
+ {default_user, <<"guest">>},
+ {default_pass, <<"guest">>},
+ {default_vhost, <<"/">>},
+ {memory_alarms, auto}]}]}.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
deleted file mode 100644
index 5be07492..00000000
--- a/ebin/rabbit_app.in
+++ /dev/null
@@ -1,21 +0,0 @@
-{application, rabbit, %% -*- erlang -*-
- [{description, "RabbitMQ"},
- {id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
- {modules, []},
- {registered, [rabbit_amqqueue_sup,
- rabbit_log,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_router,
- rabbit_sup,
- rabbit_tcp_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
- {mod, {rabbit, []}},
- {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
- {extra_startup_steps, []},
- {default_user, <<"guest">>},
- {default_pass, <<"guest">>},
- {default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {memory_alarms, auto}]}]}.
diff --git a/generate_app b/generate_app
deleted file mode 100644
index 62301292..00000000
--- a/generate_app
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-
-main([BeamDir]) ->
- Modules = [list_to_atom(filename:basename(F, ".beam")) ||
- F <- filelib:wildcard("*.beam", BeamDir)],
- {ok, {application, Application, Properties}} = io:read(''),
- NewProperties = lists:keyreplace(modules, 1, Properties,
- {modules, Modules}),
- io:format("~p.", [{application, Application, NewProperties}]).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 784c21b3..a026602a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,9 +30,7 @@
%%
-record(user, {username, password}).
--record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
--record(user_permission, {user_vhost, permission}).
-record(vhost, {virtual_host, dummy}).
@@ -78,7 +76,6 @@
-type(thunk(T) :: fun(() -> T)).
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).
--type(regexp() :: binary()).
%% this is really an abstract type, but dialyzer does not support them
-type(guid() :: any()).
@@ -93,10 +90,6 @@
-type(user() ::
#user{username :: username(),
password :: password()}).
--type(permission() ::
- #permission{configure :: regexp(),
- write :: regexp(),
- read :: regexp()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
durable :: bool(),
diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod
index 8a2d2e5b..7c4d3ef2 100644
--- a/packaging/windows/rabbitmq-service.pod
+++ b/packaging/windows/rabbitmq-service.pod
@@ -92,10 +92,8 @@ Defaults to 5672.
=head2 ERLANG_SERVICE_MANAGER_PATH
-Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>
-(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit
-environments). This is the installation location of the Erlang service
-manager.
+Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>. This is
+the installation location of the Erlang service manager.
=head2 CLUSTER_CONFIG_FILE
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
new file mode 100644
index 00000000..344b719a
--- /dev/null
+++ b/src/buffering_proxy.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(buffering_proxy).
+
+-export([start_link/2]).
+
+%% internal
+
+-export([mainloop/4, drain/2]).
+-export([proxy_loop/3]).
+
+-define(HIBERNATE_AFTER, 5000).
+
+%%----------------------------------------------------------------------------
+
+start_link(M, A) ->
+ spawn_link(
+ fun () -> process_flag(trap_exit, true),
+ ProxyPid = self(),
+ Ref = make_ref(),
+ Pid = spawn_link(
+ fun () -> ProxyPid ! Ref,
+ mainloop(ProxyPid, Ref, M,
+ M:init(ProxyPid, A)) end),
+ proxy_loop(Ref, Pid, empty)
+ end).
+
+%%----------------------------------------------------------------------------
+
+mainloop(ProxyPid, Ref, M, State) ->
+ NewState =
+ receive
+ {Ref, Messages} ->
+ NewSt =
+ lists:foldl(fun (Msg, S) ->
+ drain(M, M:handle_message(Msg, S))
+ end, State, lists:reverse(Messages)),
+ ProxyPid ! Ref,
+ NewSt;
+ Msg -> M:handle_message(Msg, State)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop,
+ [ProxyPid, Ref, M, State])
+ end,
+ ?MODULE:mainloop(ProxyPid, Ref, M, NewState).
+
+drain(M, State) ->
+ receive
+ Msg -> ?MODULE:drain(M, M:handle_message(Msg, State))
+ after 0 ->
+ State
+ end.
+
+proxy_loop(Ref, Pid, State) ->
+ receive
+ Ref ->
+ ?MODULE:proxy_loop(
+ Ref, Pid,
+ case State of
+ empty -> waiting;
+ waiting -> exit(duplicate_next);
+ Messages -> Pid ! {Ref, Messages}, empty
+ end);
+ {'EXIT', Pid, Reason} ->
+ exit(Reason);
+ {'EXIT', _, Reason} ->
+ exit(Pid, Reason),
+ ?MODULE:proxy_loop(Ref, Pid, State);
+ Msg ->
+ ?MODULE:proxy_loop(
+ Ref, Pid,
+ case State of
+ empty -> [Msg];
+ waiting -> Pid ! {Ref, [Msg]}, empty;
+ Messages -> [Msg | Messages]
+ end)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State])
+ end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1ddb5151..c8c814b6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -75,20 +75,19 @@ start() ->
try
ok = ensure_working_log_handlers(),
ok = rabbit_mnesia:ensure_mnesia_dir(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- ok = rabbit_misc:stop_applications(?APPS).
+ ok = stop_applications(?APPS).
stop_and_halt() ->
spawn(fun () ->
SleepTime = 1000,
- rabbit_log:info("Stop-and-halt request received; "
- "halting in ~p milliseconds~n",
+ rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n",
[SleepTime]),
timer:sleep(SleepTime),
init:stop()
@@ -110,6 +109,34 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
start(normal, []) ->
{ok, SupPid} = rabbit_sup:start_link(),
@@ -273,14 +300,9 @@ insert_default_data() ->
{ok, DefaultUser} = application:get_env(default_user),
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
- {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
- application:get_env(default_permissions),
ok = rabbit_access_control:add_vhost(DefaultVHost),
ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
- ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
- DefaultConfigurePerm,
- DefaultWritePerm,
- DefaultReadPerm),
+ ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost),
ok.
start_builtin_amq_applications() ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index e61eb87f..54348d9a 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -45,13 +45,11 @@
-ifdef(use_specs).
--type(permission_atom() :: 'configure' | 'read' | 'write').
-
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
--spec(check_vhost_access/2 :: (username(), vhost()) -> 'ok').
+-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
-spec(check_resource_access/3 ::
- (username(), r(atom()), permission_atom()) -> 'ok').
+ (username(), r(atom()), non_neg_integer()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -128,7 +126,7 @@ internal_lookup_vhost_access(Username, VHostPath) ->
end
end).
-check_vhost_access(Username, VHostPath) ->
+check_vhost_access(#user{username = Username}, VHostPath) ->
?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
case internal_lookup_vhost_access(Username, VHostPath) of
{ok, _R} ->
@@ -139,10 +137,6 @@ check_vhost_access(Username, VHostPath) ->
[VHostPath, Username])
end.
-permission_index(configure) -> #permission.configure;
-permission_index(write) -> #permission.write;
-permission_index(read) -> #permission.read.
-
check_resource_access(Username,
R = #resource{kind = exchange, name = <<"">>},
Permission) ->
@@ -164,7 +158,7 @@ check_resource_access(Username,
[#user_permission{permission = P}] ->
case regexp:match(
binary_to_list(Name),
- binary_to_list(element(permission_index(Permission), P))) of
+ binary_to_list(element(Permission, P))) of
{match, _, _} -> true;
nomatch -> false
end
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 21999f16..73c6e290 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -33,7 +33,7 @@
-behaviour(gen_event).
--export([start/1, stop/0, register/2]).
+-export([start/0, stop/0, maybe_conserve_memory/1]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
@@ -52,8 +52,13 @@
-type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/1 :: (bool() | 'auto') -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+<<<<<<< local
+-spec(maybe_conserve_memory/1 :: (pid()) -> 'ok').
+=======
+-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+
+>>>>>>> other
-endif.
%%----------------------------------------------------------------------------
@@ -76,10 +81,16 @@ start(MemoryAlarms) ->
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
+<<<<<<< /tmp/rabbitmq-server/src/rabbit_alarm.erl
+maybe_conserve_memory(QPid) ->
+ gen_event:call(alarm_handler, ?MODULE, {maybe_conserve_memory, QPid}).
+ {register, Pid, HighMemMFA}).
+=======
register(Pid, HighMemMFA) ->
ok = gen_event:call(alarm_handler, ?MODULE,
{register, Pid, HighMemMFA},
infinity).
+>>>>>>> /tmp/rabbit_alarm.erl~other.Lee8ob
%%----------------------------------------------------------------------------
@@ -89,12 +100,9 @@ init([MemoryAlarms]) ->
false -> undefined
end}}.
-handle_call({register, _Pid, _HighMemMFA},
- State = #alarms{alertees = undefined}) ->
- {ok, ok, State};
-handle_call({register, Pid, HighMemMFA},
- State = #alarms{alertees = Alertess}) ->
- _MRef = erlang:monitor(process, Pid),
+handle_call({maybe_conserve_memory, QPid},
+ State = #alarms{system_memory_high_watermark = Conserve}) ->
+ {ok, rabbit_amqqueue:conserve_memory(QPid, Conserve), State};
case State#alarms.system_memory_high_watermark of
true -> {M, F, A} = HighMemMFA,
ok = erlang:apply(M, F, A ++ [Pid, true]);
@@ -102,16 +110,16 @@ handle_call({register, Pid, HighMemMFA},
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
{ok, ok, State#alarms{alertees = NewAlertees}};
-
+
handle_call(_Request, State) ->
{ok, not_understood, State}.
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
- ok = alert(true, State#alarms.alertees),
+ rabbit_amqqueue:conserve_memory(true),
{ok, State#alarms{system_memory_high_watermark = true}};
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
- ok = alert(false, State#alarms.alertees),
+ rabbit_amqqueue:conserve_memory(false),
{ok, State#alarms{system_memory_high_watermark = false}};
handle_event(_Event, State) ->
@@ -136,7 +144,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
+ Mod = case os:type() of
%% memsup doesn't take account of buffers or cache when
%% considering "free" memory - therefore on Linux we can
%% get memory alarms very easily without any pressure
@@ -144,7 +152,7 @@ start_memsup() ->
%% our own simple memory monitor.
%%
{unix, linux} -> rabbit_memsup_linux;
-
+
%% Start memsup programmatically rather than via the
%% rabbitmq-server script. This is not quite the right
%% thing to do as os_mon checks to see if memsup is
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 198e2782..eb076e94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,11 +31,10 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1]).
+-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
@@ -85,7 +84,7 @@
{'error', 'in_use'} |
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), delivery()) -> bool()).
+-spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()).
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
@@ -103,7 +102,6 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -159,17 +157,11 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
- internal_declare(Q, true).
-
-internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
- case WantDefaultBinding of
- true -> add_default_binding(Q);
- false -> ok
- end,
+ ok = add_default_binding(Q),
Q;
[ExistingQ] -> ExistingQ
end
@@ -209,7 +201,9 @@ with(Name, F, E) ->
with(Name, F) ->
with(Name, F, fun () -> {error, not_found} end).
with_or_die(Name, F) ->
- with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+ with(Name, F, fun () -> rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(Name)])
+ end).
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -241,16 +235,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
-deliver(QPid, #delivery{immediate = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
- infinity);
-deliver(QPid, #delivery{mandatory = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
+deliver(_IsMandatory, true, Txn, Message, QPid) ->
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
+deliver(true, _IsImmediate, Txn, Message, QPid) ->
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
-deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
+deliver(false, _IsImmediate, Txn, Message, QPid) ->
+ gen_server2:cast(QPid, {deliver, Txn, Message}),
true.
redeliver(QPid, Messages) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf0ef44f..c390b2b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,21 +53,19 @@
has_had_consumers,
next_msg_id,
message_buffer,
- active_consumers,
- blocked_consumers}).
+ round_robin}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
--record(cr, {consumer_count,
+-record(cr, {consumers,
ch_pid,
limiter_pid,
monitor_ref,
unacked_messages,
is_limit_active,
- txn,
unsent_message_count}).
-define(INFO_KEYS,
@@ -100,8 +98,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -131,12 +128,11 @@ ch_record(ChPid) ->
case get(Key) of
undefined ->
MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
+ C = #cr{consumers = [],
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
is_limit_active = false,
- txn = none,
unsent_message_count = 0},
put(Key, C),
C;
@@ -150,7 +146,7 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
BlockedOld = is_ch_blocked(OldCR),
@@ -160,25 +156,20 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-record_current_channel_tx(ChPid, Txn) ->
- %% as a side effect this also starts monitoring the channel (if
- %% that wasn't happening already)
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
+ round_robin = RoundRobin,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- case queue:out(ActiveConsumers) of
+ case queue:out(RoundRobin) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
- ActiveConsumersTail} ->
+ RoundRobinTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -190,38 +181,24 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
- {NewActiveConsumers, NewBlockedConsumers} =
+ NewConsumers =
case ch_record_state_transition(C, NewC) of
- ok -> {queue:in(QEntry, ActiveConsumersTail),
- BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
end,
- {offered, AckRequired,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1}};
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
- {NewActiveConsumers, NewBlockedConsumers} =
- move_consumers(ChPid,
- ActiveConsumers,
- BlockedConsumers),
- deliver_immediately(
- Message, Delivered,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
end;
{empty, _} ->
{not_offered, State}
end.
-attempt_delivery(none, _ChPid, Message, State) ->
+attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
{true, State1};
@@ -232,13 +209,13 @@ attempt_delivery(none, _ChPid, Message, State) ->
{not_offered, State1} ->
{false, State1}
end;
-attempt_delivery(Txn, ChPid, Message, State) ->
+attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, ChPid, Message),
+ record_pending_message(Txn, Message),
{true, State}.
-deliver_or_enqueue(Txn, ChPid, Message, State) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
+deliver_or_enqueue(Txn, Message, State) ->
+ case attempt_delivery(Txn, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
@@ -251,24 +228,22 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
State).
-add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
+block_consumers(ChPid, RoundRobin) ->
+ %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
+ queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(RoundRobin))).
+
+unblock_consumers(ChPid, Consumers, RoundRobin) ->
+ %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]),
+ queue:join(RoundRobin,
+ queue:from_list([{ChPid, Con} || Con <- Consumers])).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
+block_consumer(ChPid, ConsumerTag, RoundRobin) ->
+ %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]),
queue:from_list(lists:filter(
fun ({CP, #consumer{tag = CT}}) ->
(CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(Queue))).
-
-remove_consumers(ChPid, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue))).
-
-move_consumers(ChPid, From, To) ->
- {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ end, queue:to_list(RoundRobin))).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -279,48 +254,65 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_poke_burst(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedeConsumers})
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
end
end.
-should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
-should_auto_delete(#q{has_had_consumers = false}) -> false;
-should_auto_delete(State) -> is_unused(State).
+check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
+ {continue, State};
+check_auto_delete(State = #q{has_had_consumers = false}) ->
+ {continue, State};
+check_auto_delete(State = #q{round_robin = RoundRobin}) ->
+ % The clauses above rule out cases where no-one has consumed from
+ % this queue yet, and cases where we are not an auto_delete queue
+ % in any case. Thus it remains to check whether we have any active
+ % listeners at this point.
+ case queue:is_empty(RoundRobin) of
+ true ->
+ % There are no waiting listeners. It's possible that we're
+ % completely unused. Check.
+ case is_unused() of
+ true ->
+ % There are no active consumers at this
+ % point. This is the signal to autodelete.
+ {stop, State};
+ false ->
+ % There is at least one active consumer, so we
+ % shouldn't delete ourselves.
+ {continue, State}
+ end;
+ false ->
+ % There are some waiting listeners, thus we are not
+ % unused, so can continue life as normal without needing
+ % to check the process dictionary.
+ {continue, State}
+ end.
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+ round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
not_found -> noreply(State);
- #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
- unacked_messages = UAM} ->
+ #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
+ NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
- case Txn of
- none -> ok;
- _ -> ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn)
- end,
- NewState =
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)}),
- case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ case check_auto_delete(
+ deliver_or_enqueue_n(
+ [{Message, true} ||
+ {_Messsage_id, Message} <- dict:to_list(UAM)],
+ State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ round_robin = NewActive})) of
+ {continue, NewState} ->
+ noreply(NewState);
+ {stop, NewState} ->
+ {stop, normal, NewState}
end
end.
@@ -333,12 +325,12 @@ check_queue_owner(none, _) -> ok;
check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
check_queue_owner({_, _}, _) -> mismatch.
-check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
+check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) ->
in_use;
-check_exclusive_access(none, false, _State) ->
+check_exclusive_access(none, false) ->
ok;
-check_exclusive_access(none, true, State) ->
- case is_unused(State) of
+check_exclusive_access(none, true) ->
+ case is_unused() of
true -> ok;
false -> in_use
end.
@@ -363,8 +355,16 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
-is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
- queue:is_empty(State#q.blocked_consumers).
+is_unused() ->
+ is_unused1(get()).
+
+is_unused1([]) ->
+ true;
+is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest])
+ when Consumers /= [] ->
+ false;
+is_unused1([_ | Rest]) ->
+ is_unused1(Rest).
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -456,17 +456,13 @@ is_tx_persistent(Txn) ->
#tx{is_persistent = Res} = lookup_tx(Txn),
Res.
-record_pending_message(Txn, ChPid, Message) ->
+record_pending_message(Txn, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
- ch_pid = ChPid}).
+ store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
- ch_pid = ChPid}).
+ store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}).
process_pending(Txn, State) ->
#tx{ch_pid = ChPid,
@@ -523,8 +519,9 @@ i(messages, State) ->
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
-i(consumers, State) ->
- queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
+i(consumers, _) ->
+ lists:sum([length(Consumers) ||
+ #cr{consumers = Consumers} <- all_ch_record()]);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@@ -544,7 +541,7 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -558,12 +555,12 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
+ {Delivered, NewState} = attempt_delivery(Txn, Message, State),
reply(Delivered, NewState);
-handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver, Txn, Message}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
+ {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
@@ -606,91 +603,78 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder}) ->
+ exclusive_consumer = ExistingHolder,
+ round_robin = RoundRobin}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ C = #cr{consumers = Consumers} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
+ if Consumers == [] ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
+ exclusive_consumer =
+ if
+ ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_poke_burst(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
+ reply(ok, run_poke_burst(State1))
end
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder}) ->
+ State = #q{exclusive_consumer = Holder,
+ round_robin = RoundRobin}) ->
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
- store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
- if ConsumerCount == 1 ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
+ NewConsumers = lists:filter
+ (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
+ Consumers),
+ store_ch_record(C#cr{consumers = NewConsumers}),
+ if NewConsumers == [] ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->
ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
- NewState =
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers),
- blocked_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.blocked_consumers)},
- case should_auto_delete(NewState) of
- false -> reply(ok, NewState);
- true -> {stop, normal, ok, NewState}
+ case check_auto_delete(
+ State#q{exclusive_consumer = cancel_holder(ChPid,
+ ConsumerTag,
+ Holder),
+ round_robin = block_consumer(ChPid,
+ ConsumerTag,
+ RoundRobin)}) of
+ {continue, State1} ->
+ reply(ok, State1);
+ {stop, State1} ->
+ {stop, normal, ok, State1}
end
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
- active_consumers = ActiveConsumers}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
- State);
+ round_robin = RoundRobin}) ->
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
IsEmpty = queue:is_empty(MessageBuffer),
- IsUnused = is_unused(State),
+ IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
@@ -709,7 +693,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
case Owner of
none ->
- case check_exclusive_access(Holder, true, State) of
+ case check_exclusive_access(Holder, true) of
in_use ->
%% FIXME: Is this really the right answer? What if
%% an active consumer's reader is actually the
@@ -727,9 +711,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(locked, State)
end.
-handle_cast({deliver, Txn, Message, ChPid}, State) ->
+handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
+ {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
@@ -785,10 +769,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
+ fun (C = #cr{consumers = Consumers,
limiter_pid = OldLimiterPid,
is_limit_active = Limited}) ->
- if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3089bb62..b2716ec4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -231,13 +231,13 @@ clear_permission_cache() ->
ok.
check_configure_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, configure).
+ check_resource_access(Username, Resource, #permission.configure).
check_write_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, write).
+ check_resource_access(Username, Resource, #permission.write).
check_read_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, read).
+ check_resource_access(Username, Resource, #permission.read).
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
@@ -306,9 +306,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ Content, State = #ch{ virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -319,30 +317,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> rabbit_guid:guid();
false -> none
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- persistent_key = PersistentKey},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
- case RoutingRes of
- routed ->
- ok;
- unroutable ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
- not_delivered ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
- end,
- {noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
- end};
+ {noreply, publish(Mandatory, Immediate,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ persistent_key = PersistentKey},
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -571,13 +551,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
{ok, FoundX} -> FoundX;
{error, not_found} ->
check_name('exchange', ExchangeNameBin),
- case rabbit_misc:r_arg(VHostPath, exchange, Args,
- <<"alternate-exchange">>) of
- undefined -> ok;
- AName -> check_read_permitted(ExchangeName, State),
- check_write_permitted(AName, State),
- ok
- end,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
@@ -606,7 +579,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
- rabbit_misc:not_found(ExchangeName);
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, in_use} ->
die_precondition_failed(
"~s in use", [rabbit_misc:rs(ExchangeName)]);
@@ -772,9 +746,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, exchange_not_found} ->
- rabbit_misc:not_found(ExchangeName);
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, queue_not_found} ->
- rabbit_misc:not_found(QueueName);
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
{error, exchange_and_queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
@@ -791,6 +767,30 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
+publish(Mandatory, Immediate, Message, QPids,
+ State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
+ Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
+ Message, WriterPid),
+ case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(Handled, State)
+ end.
+
+deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
+ case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
+ {ok, DeliveredQPids} -> DeliveredQPids;
+ {error, unroutable} ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
+ [];
+ {error, not_delivered} ->
+ %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
+ []
+ end.
+
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6649899a..352d7e75 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -127,10 +127,10 @@ Available commands:
delete_vhost <VHostPath>
list_vhosts
- set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
- clear_permissions [-p <VHostPath>] <UserName>
- list_permissions [-p <VHostPath>]
- list_user_permissions <UserName>
+ map_user_vhost <UserName> <VHostPath>
+ unmap_user_vhost <UserName> <VHostPath>
+ list_user_vhosts <UserName>
+ list_vhost_users <VHostPath>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
@@ -236,14 +236,25 @@ action(list_vhosts, Node, [], Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(list_user_permissions, Node, Args = [_Username], Inform) ->
- Inform("Listing permissions for user ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_user_permissions,
- Args}));
+action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) ->
+ Inform("Mapping user ~p to vhost ~p", Args),
+ call(Node, {rabbit_access_control, map_user_vhost, Args});
+
+action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) ->
+ Inform("Unmapping user ~p from vhost ~p", Args),
+ call(Node, {rabbit_access_control, unmap_user_vhost, Args});
+
+action(list_user_vhosts, Node, Args = [_Username], Inform) ->
+ Inform("Listing vhosts for user ~p", Args),
+ display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args}));
+
+action(list_vhost_users, Node, Args = [_VHostPath], Inform) ->
+ Inform("Listing users for vhosts ~p", Args),
+ display_list(call(Node, {rabbit_access_control, list_vhost_users, Args}));
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
+ {VHostArg, RemainingArgs} = parse_vhost_flag(Args),
ArgAtoms = list_replace(node, pid,
default_if_empty(RemainingArgs, [name, messages])),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
@@ -252,7 +263,7 @@ action(list_queues, Node, Args, Inform) ->
action(list_exchanges, Node, Args, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
+ {VHostArg, RemainingArgs} = parse_vhost_flag(Args),
ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
@@ -260,7 +271,7 @@ action(list_exchanges, Node, Args, Inform) ->
action(list_bindings, Node, Args, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ {VHostArg, _} = parse_vhost_flag(Args),
InfoKeys = [exchange_name, routing_key, queue_name, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
@@ -274,37 +285,15 @@ action(list_connections, Node, Args, Inform) ->
default_if_empty(Args, [user, peer_address, peer_port])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
- ArgAtoms);
-
-action(Command, Node, Args, Inform) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- action(Command, Node, VHost, RemainingArgs, Inform).
-
-action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
- Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, set_permissions,
- [Username, VHost, CPerm, WPerm, RPerm]});
-
-action(clear_permissions, Node, VHost, [Username], Inform) ->
- Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
- call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
-
-action(list_permissions, Node, VHost, [], Inform) ->
- Inform("Listing permissions in vhost ~p", [VHost]),
- display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
- [VHost]})).
+ ArgAtoms).
parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {VHost, RemainingArgs};
- RemainingArgs ->
- {"/", RemainingArgs}
- end.
-
-parse_vhost_flag_bin(Args) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- {list_to_binary(VHost), RemainingArgs}.
+ case Args of
+ ["-p", VHost | RemainingArgs] ->
+ {list_to_binary(VHost), RemainingArgs};
+ RemainingArgs ->
+ {<<"/">>, RemainingArgs}
+ end.
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
@@ -314,17 +303,21 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
- X <- InfoItemKeys])
- end, Results),
+ lists:foreach(
+ fun (Result) ->
+ io:fwrite(
+ lists:flatten(
+ rabbit_misc:intersperse(
+ "\t",
+ [format_info_item(Result, X) || X <- InfoItemKeys]))),
+ io:nl()
+ end,
+ Results),
ok;
+
display_info_list(Other, _) ->
Other.
-display_row(Row) ->
- io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
- io:nl().
-
format_info_item(Items, Key) ->
{value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
case Info of
@@ -341,10 +334,8 @@ format_info_item(Items, Key) ->
end.
display_list(L) when is_list(L) ->
- lists:foreach(fun (I) when is_binary(I) ->
- io:format("~s~n", [url_encode(I)]);
- (I) when is_tuple(I) ->
- display_row([url_encode(V) || V <- tuple_to_list(I)])
+ lists:foreach(fun (I) ->
+ io:format("~s~n", [binary_to_list(I)])
end,
lists:sort(L)),
ok;
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 183b6984..9a9220b5 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -46,7 +46,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of "
+ rabbit_log:error("Failed to append contents of " ++
"log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7d9948f0..3b6338c7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,8 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
- publish/2]).
+ simple_publish/6, simple_publish/3,
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -56,6 +57,8 @@
-ifdef(use_specs).
+-type(publish_res() :: {'ok', [pid()]} |
+ not_found() | {'error', 'unroutable' | 'not_delivered'}).
-type(bind_res() :: 'ok' | {'error',
'queue_not_found' |
'exchange_not_found' |
@@ -72,7 +75,11 @@
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
+-spec(simple_publish/6 ::
+ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
+ publish_res()).
+-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -157,7 +164,9 @@ lookup(Name) ->
lookup_or_die(Name) ->
case lookup(Name) of
{ok, X} -> X;
- {error, not_found} -> rabbit_misc:not_found(Name)
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(Name)])
end.
list(VHostPath) ->
@@ -187,41 +196,36 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Delivery) ->
- publish(X, [], Delivery).
-
-publish(X, Seen, Delivery = #delivery{
- message = #basic_message{routing_key = RK, content = C}}) ->
- case rabbit_router:deliver(route(X, RK, C), Delivery) of
- {_, []} = R ->
- #exchange{name = XName, arguments = Args} = X,
- case rabbit_misc:r_arg(XName, exchange, Args,
- <<"alternate-exchange">>) of
- undefined ->
- R;
- AName ->
- NewSeen = [XName | Seen],
- case lists:member(AName, NewSeen) of
- true ->
- R;
- false ->
- case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
- end
- end;
- R ->
- R
+%% Usable by Erlang code that wants to publish messages.
+simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
+ ContentTypeBin, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ Content = #content{class_id = ClassId,
+ properties = #'P_basic'{content_type = ContentTypeBin},
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]},
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ persistent_key = none},
+ simple_publish(Mandatory, Immediate, Message).
+
+%% Usable by Erlang code that wants to publish messages.
+simple_publish(Mandatory, Immediate,
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}) ->
+ case lookup(ExchangeName) of
+ {ok, Exchange} ->
+ QPids = route(Exchange, RoutingKey, Content),
+ rabbit_router:deliver(QPids, Mandatory, Immediate,
+ none, Message);
+ {error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
@@ -248,9 +252,6 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
route(X = #exchange{type = direct}, RoutingKey, _Content) ->
match_routing_key(X, RoutingKey).
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(#exchange{name = Name}, Match) ->
@@ -346,13 +347,16 @@ exchanges_for_queue(QueueName) ->
sets:from_list(
mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-contains(Table, MatchHead) ->
+has_bindings(ExchangeName) ->
+ MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
try
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
+ continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}],
+ 1, read))
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- case mnesia:match_object(Table, MatchHead, read) of
+ case mnesia:match_object(rabbit_route, MatchHead, read) of
[] -> false;
[_|_] -> true
end
@@ -382,40 +386,32 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
- true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
+ true -> ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
- end
- end).
-
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- Fun(X, Q, #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = sort_arguments(Arguments)})
+ ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:delete_object/3),
+ maybe_auto_delete(X)
end).
-sync_binding(Binding, Durable, Fun) ->
+sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
+ Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = sort_arguments(Arguments)},
ok = case Durable of
true -> Fun(rabbit_durable_route,
#route{binding = Binding}, write);
@@ -481,7 +477,7 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and {add,delete}_binding/4 do.
+%% route/3 and sync_binding/6 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
@@ -570,11 +566,7 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
ok.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
- case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
+ case has_bindings(ExchangeName) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 5c447792..060bed48 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -95,15 +95,13 @@ collect_content(ChannelPid, MethodName) ->
true ->
rabbit_misc:protocol_error(
command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
+ "expected content header for class ~w, got one for class ~w instead",
[ClassId, HeaderClassId])
end;
_ ->
rabbit_misc:protocol_error(
command_invalid,
- "expected content header for class ~w, "
- "got non content header frame instead",
+ "expected content header for class ~w, got non content header frame instead",
[ClassId])
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 72e16f0f..4da247a4 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,7 +51,6 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
--export([start_applications/1, stop_applications/1]).
-import(mnesia).
-import(lists).
@@ -110,9 +109,13 @@
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
+<<<<<<< /tmp/rabbitmq-server/src/rabbit_misc.erl
+-spec(format_stderr/2 :: (string(), [any()]) -> 'true').
+=======
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+>>>>>>> /tmp/rabbit_misc.erl~other.qjyLOB
-endif.
@@ -249,7 +252,7 @@ filter_exit_map(F, L) ->
with_user(Username, Thunk) ->
fun () ->
- case mnesia:read({rabbit_user, Username}) of
+ case mnesia:read({user, Username}) of
[] ->
mnesia:abort({no_such_user, Username});
[_U] ->
@@ -259,7 +262,7 @@ with_user(Username, Thunk) ->
with_vhost(VHostPath, Thunk) ->
fun () ->
- case mnesia:read({rabbit_vhost, VHostPath}) of
+ case mnesia:read({vhost, VHostPath}) of
[] ->
mnesia:abort({no_such_vhost, VHostPath});
[_V] ->
@@ -389,6 +392,11 @@ ensure_parent_dirs_exist(Filename) ->
end.
format_stderr(Fmt, Args) ->
+<<<<<<< /tmp/rabbitmq-server/src/rabbit_misc.erl
+ Port = open_port({fd, 0, 2}, [out]),
+ port_command(Port, io_lib:format(Fmt, Args)),
+ port_close(Port).
+=======
case os:type() of
{unix, _} ->
Port = open_port({fd, 0, 2}, [out]),
@@ -431,3 +439,4 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+>>>>>>> /tmp/rabbit_misc.erl~other.qjyLOB
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 575ecb0a..0c573073 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -101,50 +101,33 @@ force_reset() -> reset(true).
%%--------------------------------------------------------------------
table_definitions() ->
- [{rabbit_user,
- [{record_name, user},
- {attributes, record_info(fields, user)},
- {disc_copies, [node()]}]},
- {rabbit_user_permission,
- [{record_name, user_permission},
- {attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]}]},
- {rabbit_vhost,
- [{record_name, vhost},
- {attributes, record_info(fields, vhost)},
- {disc_copies, [node()]}]},
- {rabbit_config,
- [{disc_copies, [node()]}]},
- {rabbit_listener,
- [{record_name, listener},
- {attributes, record_info(fields, listener)},
- {type, bag}]},
- {rabbit_durable_route,
- [{record_name, route},
- {attributes, record_info(fields, route)},
- {disc_copies, [node()]}]},
- {rabbit_route,
- [{record_name, route},
- {attributes, record_info(fields, route)},
- {type, ordered_set}]},
- {rabbit_reverse_route,
- [{record_name, reverse_route},
- {attributes, record_info(fields, reverse_route)},
- {type, ordered_set}]},
- {rabbit_durable_exchange,
- [{record_name, exchange},
- {attributes, record_info(fields, exchange)},
- {disc_copies, [node()]}]},
- {rabbit_exchange,
- [{record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
- {rabbit_durable_queue,
- [{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)},
- {disc_copies, [node()]}]},
- {rabbit_queue,
- [{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ [{user, [{disc_copies, [node()]},
+ {attributes, record_info(fields, user)}]},
+ {user_vhost, [{type, bag},
+ {disc_copies, [node()]},
+ {attributes, record_info(fields, user_vhost)},
+ {index, [virtual_host]}]},
+ {vhost, [{disc_copies, [node()]},
+ {attributes, record_info(fields, vhost)}]},
+ {rabbit_config, [{disc_copies, [node()]}]},
+ {listener, [{type, bag},
+ {attributes, record_info(fields, listener)}]},
+ {durable_routes, [{disc_copies, [node()]},
+ {record_name, route},
+ {attributes, record_info(fields, route)}]},
+ {route, [{type, ordered_set},
+ {attributes, record_info(fields, route)}]},
+ {reverse_route, [{type, ordered_set},
+ {attributes, record_info(fields, reverse_route)}]},
+ {durable_exchanges, [{disc_copies, [node()]},
+ {record_name, exchange},
+ {attributes, record_info(fields, exchange)}]},
+ {exchange, [{attributes, record_info(fields, exchange)}]},
+ {durable_queues, [{disc_copies, [node()]},
+ {record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)}]},
+ {amqqueue, [{attributes, record_info(fields, amqqueue)},
+ {index, [pid]}]}].
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
@@ -263,8 +246,8 @@ init_db(ClusterNodes) ->
%% NB: we cannot use rabbit_log here since
%% it may not have been started yet
error_logger:warning_msg(
- "schema integrity check failed: ~p~n"
- "moving database to backup location "
+ "schema integrity check failed: ~p~n" ++
+ "moving database to backup location " ++
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2dbd5a5a..99ea37d8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -123,7 +123,6 @@ stop_tcp_listener(Host, Port) ->
tcp_listener_started(IPAddress, Port) ->
ok = mnesia:dirty_write(
- rabbit_listener,
#listener{node = node(),
protocol = tcp,
host = tcp_host(IPAddress),
@@ -131,20 +130,19 @@ tcp_listener_started(IPAddress, Port) ->
tcp_listener_stopped(IPAddress, Port) ->
ok = mnesia:dirty_delete_object(
- rabbit_listener,
#listener{node = node(),
protocol = tcp,
host = tcp_host(IPAddress),
port = Port}).
active_listeners() ->
- rabbit_misc:dirty_read_all(rabbit_listener).
+ rabbit_misc:dirty_read_all(listener).
node_listeners(Node) ->
- mnesia:dirty_read(rabbit_listener, Node).
+ mnesia:dirty_read(listener, Node).
on_node_down(Node) ->
- ok = mnesia:dirty_delete(rabbit_listener, Node).
+ ok = mnesia:dirty_delete(listener, Node).
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a67b2edc..985ca3e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -105,18 +105,10 @@
%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
-%% channel exit with hard error
-%% -> log error, wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *closed*
-%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing
-%% if last channel to exit then send connection.close_ok,
-%% start terminate_connection timer, *closed*
-%% else *closing*
-%% channel exits normally
-%% -> if last channel to exit then send connection.close_ok,
-%% start terminate_connection timer, *closed*
+%% channel exit ->
+%% if abnormal exit then log error
+%% if last channel to exit then send connection.close_ok, start
+%% terminate_connection timer, *closing*
%% closed:
%% socket close -> *terminate*
%% receive connection.close_ok -> self() ! terminate_connection,
@@ -173,8 +165,7 @@ setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
once ->
- rabbit_log:info("Enabling profiling for this connection, "
- "and disabling for subsequent.~n"),
+ rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"),
rabbit_misc:set_config(profiling_enabled, false),
fprof:trace(start);
true ->
@@ -288,8 +279,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -357,6 +346,7 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
+<<<<<<< local
handle_channel_exit(Channel, Reason, State) ->
%% We remove the channel from the inbound map only. That allows
%% the channel to be re-opened, but also means the remaining
@@ -365,13 +355,26 @@ handle_channel_exit(Channel, Reason, State) ->
erase({channel, Channel}),
handle_exception(State, Channel, Reason).
+handle_dependent_exit(Pid, Reason,
+ State = #v1{connection_state = closing}) ->
+ case channel_cleanup(Pid) of
+ undefined -> exit({abnormal_dependent_exit, Pid, Reason});
+ Channel ->
+ case Reason of
+ normal -> ok;
+ _ -> log_channel_error(closing, Channel, Reason)
+ end,
+ maybe_close(State)
+ end;
+=======
+>>>>>>> other
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
- maybe_close(State);
+ State;
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+ Channel -> handle_exception(State, Channel, Reason)
end.
channel_cleanup(Pid) ->
@@ -419,8 +422,7 @@ wait_for_channel_termination(N, TimerRef) ->
normal -> ok;
_ ->
rabbit_log:error(
- "connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
+ "connection ~p, channel ~p - error while terminating:~n~p~n",
[self(), Channel, Reason])
end,
wait_for_channel_termination(N-1, TimerRef)
@@ -429,15 +431,13 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State) ->
case all_channels() of
[] -> ok = send_on_channel0(
State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State
- end;
-maybe_close(State) ->
- State.
+ end.
handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
@@ -725,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/4,
+ [self(), WriterPid, Username, VHost]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 10f80cc3..0b06a063 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2]).
+ deliver/5]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -50,7 +50,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
+-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
+ {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}).
-endif.
@@ -61,13 +62,13 @@ start_link() ->
-ifdef(BUG19758).
-deliver(QPids, Delivery) ->
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery)).
+deliver(QPids, Mandatory, Immediate, Txn, Message) ->
+ check_delivery(Mandatory, Immediate,
+ run_bindings(QPids, Mandatory, Immediate, Txn, Message)).
-else.
-deliver(QPids, Delivery) ->
+deliver(QPids, Mandatory, Immediate, Txn, Message) ->
%% we reduce inter-node traffic by grouping the qpids by node and
%% only delivering one copy of the message to each node involved,
%% which then in turn delivers it to its queues.
@@ -80,14 +81,16 @@ deliver(QPids, Delivery) ->
[QPid], D)
end,
dict:new(), QPids)),
- Delivery).
+ Mandatory, Immediate, Txn, Message).
-deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
+deliver_per_node([{Node, QPids}], Mandatory, Immediate,
+ Txn, Message)
+ when Node == node() ->
%% optimisation
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery));
-deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+ check_delivery(Mandatory, Immediate,
+ run_bindings(QPids, Mandatory, Immediate, Txn, Message));
+deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
+ Txn, Message) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver in run_bindings below will deliver the
%% message to the queue process asynchronously, and return true,
@@ -95,19 +98,22 @@ deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
%% therefore safe to use a fire-and-forget cast here and return
%% the QPids - the semantics is preserved. This scales much better
%% than the non-immediate case below.
- {routed,
- lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
- QPids
- end,
- NodeQPids)};
-deliver_per_node(NodeQPids, Delivery) ->
+ {ok, lists:flatmap(
+ fun ({Node, QPids}) ->
+ gen_server2:cast(
+ {?SERVER, Node},
+ {deliver, QPids, Mandatory, Immediate, Txn, Message}),
+ QPids
+ end,
+ NodeQPids)};
+deliver_per_node(NodeQPids, Mandatory, Immediate,
+ Txn, Message) ->
R = rabbit_misc:upmap(
fun ({Node, QPids}) ->
- try gen_server2:call({?SERVER, Node},
- {deliver, QPids, Delivery},
- infinity)
+ try gen_server2:call(
+ {?SERVER, Node},
+ {deliver, QPids, Mandatory, Immediate, Txn, Message},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here
@@ -124,8 +130,7 @@ deliver_per_node(NodeQPids, Delivery) ->
end,
{false, []},
R),
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, lists:append(Handled)}).
+ check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}).
-endif.
@@ -134,17 +139,19 @@ deliver_per_node(NodeQPids, Delivery) ->
init([]) ->
{ok, no_state}.
-handle_call({deliver, QPids, Delivery}, From, State) ->
+handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
+ From, State) ->
spawn(
fun () ->
- R = run_bindings(QPids, Delivery),
+ R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
gen_server2:reply(From, R)
end),
{noreply, State}.
-handle_cast({deliver, QPids, Delivery}, State) ->
+handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message},
+ State) ->
%% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Delivery),
+ run_bindings(QPids, Mandatory, Immediate, Txn, Message),
{noreply, State}.
handle_info(_Info, State) ->
@@ -158,10 +165,11 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-run_bindings(QPids, Delivery) ->
+run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
lists:foldl(
fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(QPid, Delivery) of
+ case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
+ Txn, Message, QPid) of
true -> {true, [QPid | Handled]};
false -> {true, Handled};
{'EXIT', _Reason} -> {Routed, Handled}
@@ -171,6 +179,6 @@ run_bindings(QPids, Delivery) ->
QPids).
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {unroutable, []};
-check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
-check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
+check_delivery(true, _ , {false, []}) -> {error, unroutable};
+check_delivery(_ , true, {_ , []}) -> {error, not_delivered};
+check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 2a365ce1..9e4c9c8a 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -47,7 +47,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of "
+ rabbit_log:error("Failed to append contents of " ++
"sasl log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8f0a3a89..bcfce33a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -507,16 +507,17 @@ test_user_management() ->
{error, {no_such_vhost, _}} =
control_action(delete_vhost, ["/testhost"]),
{error, {no_such_user, _}} =
- control_action(set_permissions, ["foo", ".*", ".*", ".*"]),
+ control_action(map_user_vhost, ["foo", "/"]),
{error, {no_such_user, _}} =
- control_action(clear_permissions, ["foo"]),
+ control_action(unmap_user_vhost, ["foo", "/"]),
{error, {no_such_user, _}} =
- control_action(list_user_permissions, ["foo"]),
+ control_action(list_user_vhosts, ["foo"]),
{error, {no_such_vhost, _}} =
- control_action(list_permissions, ["-p", "/testhost"]),
- {error, {invalid_regexp, _, _}} =
- control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
-
+ control_action(map_user_vhost, ["guest", "/testhost"]),
+ {error, {no_such_vhost, _}} =
+ control_action(unmap_user_vhost, ["guest", "/testhost"]),
+ {error, {no_such_vhost, _}} =
+ control_action(list_vhost_users, ["/testhost"]),
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
@@ -531,16 +532,13 @@ test_user_management() ->
ok = control_action(list_vhosts, []),
%% user/vhost mapping
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(list_permissions, ["-p", "/testhost"]),
- ok = control_action(list_user_permissions, ["foo"]),
+ ok = control_action(map_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(map_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(list_user_vhosts, ["foo"]),
%% user/vhost unmapping
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
+ ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
%% vhost deletion
ok = control_action(delete_vhost, ["/testhost"]),
@@ -549,8 +547,7 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
+ ok = control_action(map_user_vhost, ["foo", "/testhost"]),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion