diff options
author | Vlad Ionescu <vlad@lshift.net> | 2009-06-13 12:25:29 +0100 |
---|---|---|
committer | Vlad Ionescu <vlad@lshift.net> | 2009-06-13 12:25:29 +0100 |
commit | ce13e65faebc930a84147a26a7e3dabc2298c55c (patch) | |
tree | 6492621234b8958cf8bba995d6ea4dab2493a7f8 | |
parent | 35863cd5d7e3b01670b828c7d656d45d5e122c0f (diff) | |
parent | 0b18bf63b8fda7961562cb4b9fb1575eb0f694ef (diff) | |
download | rabbitmq-server-bug19911.tar.gz |
merging bug19911 into junkbug19911
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | Makefile | 24 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.pod | 67 | ||||
-rw-r--r-- | ebin/rabbit.app | 57 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 21 | ||||
-rw-r--r-- | generate_app | 10 | ||||
-rw-r--r-- | include/rabbit.hrl | 7 | ||||
-rw-r--r-- | packaging/windows/rabbitmq-service.pod | 6 | ||||
-rw-r--r-- | src/buffering_proxy.erl | 108 | ||||
-rw-r--r-- | src/rabbit.erl | 42 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 14 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 34 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 35 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 318 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 80 | ||||
-rw-r--r-- | src/rabbit_control.erl | 93 | ||||
-rw-r--r-- | src/rabbit_error_logger_file_h.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 134 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 15 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 75 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 8 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 52 | ||||
-rw-r--r-- | src/rabbit_router.erl | 76 | ||||
-rw-r--r-- | src/rabbit_sasl_report_file_h.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 31 |
26 files changed, 705 insertions, 613 deletions
@@ -9,7 +9,6 @@ syntax: regexp ^include/rabbit_framing.hrl$ ^src/rabbit_framing.erl$ ^rabbit.plt$ -^ebin/rabbit.app$ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ @@ -7,8 +7,7 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) +TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -17,7 +16,7 @@ PYTHON=python ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are # only available in R12B-3 upwards -# +# # NB: the test assumes that version number will only contain single digits USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi) endif @@ -40,15 +39,9 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) -$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app - escript generate_app $(EBIN_DIR) < $< > $@ - -$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam - erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< +# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -59,12 +52,12 @@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -dialyze: $(BEAM_TARGETS) +dialyze: $(TARGETS) dialyzer -c $? clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -128,8 +121,13 @@ srcdist: distclean >> $(TARGET_SRC_DIR)/INSTALL cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ +<<<<<<< /tmp/rabbitmq-server/Makefile + >> $(TARGET_SRC_DIR)/README + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app +======= >> $(TARGET_SRC_DIR)/BUILD sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save +>>>>>>> /tmp/Makefile~other.J-SLyR cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 42156896..d0a27a36 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -26,7 +26,7 @@ B<-n> I<node> startup time). The output of hostname -s is usually the correct suffix to use after the "@" sign. See rabbitmq-server(1) for details of configuring the RabbitMQ broker. - + B<-q> quiet output mode is selected with the B<-q> flag. Informational messages are suppressed when quiet mode is in effect. @@ -43,32 +43,32 @@ stop_app This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. I<reset>. - + start_app start the RabbitMQ application. This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. I<reset>. - + status display various information about the RabbitMQ broker, such as whether the RabbitMQ application on the current node, its version number, what nodes are part of the broker, which of these are running. - + force return a RabbitMQ node to its virgin state. Removes the node from any cluster it belongs to, removes all data from the management database, such as configured users, vhosts and deletes all persistent messages. - + force_reset the same as I<force> command, but resets the node unconditionally, regardless of the current management database state and cluster configuration. It should only be used as a last resort if the database or cluster configuration has been corrupted. - + rotate_logs [suffix] instruct the RabbitMQ node to rotate the log files. The RabbitMQ broker will attempt to append the current contents of the log file @@ -81,58 +81,53 @@ rotate_logs [suffix] specified. This command might be helpful when you are e.g. writing your own logrotate script and you do not want to restart the RabbitMQ node. - + cluster I<clusternode> ... instruct the node to become member of a cluster with the specified nodes determined by I<clusternode> option(s). See http://www.rabbitmq.com/clustering.html for more information about clustering. - + =head2 USER MANAGEMENT - + add_user I<username> I<password> create a user named I<username> with (initial) password I<password>. - -delete_user I<username> - delete the user named I<username>. - + change_password I<username> I<newpassword> change the password for the user named I<username> to I<newpassword>. list_users list all users. - + =head2 ACCESS CONTROL add_vhost I<vhostpath> create a new virtual host called I<vhostpath>. - + delete_vhost I<vhostpath> delete a virtual host I<vhostpath>. +<<<<<<< local + That command deletes also all its exchanges, queues and user mappings. +======= That command deletes also all its exchanges, queues and user mappings. +>>>>>>> other list_vhosts list all virtual hosts. - -set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> - set the permissions for the user named I<username> in the virtual - host I<vhostpath>, granting 'configure', 'write' and 'read' access - to resources with names matching the first, second and third - I<regexp>, respectively. - -clear_permissions [-p I<vhostpath>] I<username> - remove the permissions for the user named I<username> in the - virtual host I<vhostpath>. - -list_permissions [-p I<vhostpath>] - list all the users and their permissions in the virtual host + +map_user_vhost I<username> I<vhostpath> + grant the user named I<username> access to the virtual host called + I<vhostpath>. + +unmap_user_vhost I<username> I<vhostpath> + deny the user named I<username> access to the virtual host called I<vhostpath>. -list_user_permissions I<username> - list the permissions of the user named I<username> across all - virtual hosts. - +list_user_vhost I<username> + list all the virtual hosts to which the user named I<username> has + been granted access. + =head2 SERVER STATUS list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] @@ -240,7 +235,7 @@ peer_address peer_port peer port - + state connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, B<running>, B<closing>, B<closed>) @@ -274,7 +269,7 @@ send_cnt send_pend send queue size - + =back The list_queues, list_exchanges and list_bindings commands accept an @@ -288,12 +283,12 @@ Create a user named foo with (initial) password bar at the Erlang node rabbit@test: rabbitmqctl -n rabbit@test add_user foo bar - + Grant user named foo access to the virtual host called test at the default Erlang node: rabbitmqctl map_user_vhost foo test - + Append the current logs' content to the files with ".1" suffix and reopen them: diff --git a/ebin/rabbit.app b/ebin/rabbit.app new file mode 100644 index 00000000..0d714fdf --- /dev/null +++ b/ebin/rabbit.app @@ -0,0 +1,57 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, [buffering_proxy, + rabbit_access_control, + rabbit_alarm, + rabbit_amqqueue, + rabbit_amqqueue_process, + rabbit_amqqueue_sup, + rabbit_binary_generator, + rabbit_binary_parser, + rabbit_channel, + rabbit_control, + rabbit, + rabbit_error_logger, + rabbit_error_logger_file_h, + rabbit_exchange, + rabbit_framing_channel, + rabbit_framing, + rabbit_heartbeat, + rabbit_load, + rabbit_log, + rabbit_memsup_linux, + rabbit_misc, + rabbit_mnesia, + rabbit_multi, + rabbit_networking, + rabbit_node_monitor, + rabbit_persister, + rabbit_reader, + rabbit_router, + rabbit_sasl_report_file_h, + rabbit_sup, + rabbit_tests, + rabbit_tracer, + rabbit_writer, + tcp_acceptor, + tcp_acceptor_sup, + tcp_client_sup, + tcp_listener, + tcp_listener_sup]}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {memory_alarms, auto}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in deleted file mode 100644 index 5be07492..00000000 --- a/ebin/rabbit_app.in +++ /dev/null @@ -1,21 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, []}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app deleted file mode 100644 index 62301292..00000000 --- a/generate_app +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- - -main([BeamDir]) -> - Modules = [list_to_atom(filename:basename(F, ".beam")) || - F <- filelib:wildcard("*.beam", BeamDir)], - {ok, {application, Application, Properties}} = io:read(''), - NewProperties = lists:keyreplace(modules, 1, Properties, - {modules, Modules}), - io:format("~p.", [{application, Application, NewProperties}]). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 784c21b3..a026602a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,9 +30,7 @@ %% -record(user, {username, password}). --record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). --record(user_permission, {user_vhost, permission}). -record(vhost, {virtual_host, dummy}). @@ -78,7 +76,6 @@ -type(thunk(T) :: fun(() -> T)). -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). --type(regexp() :: binary()). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). @@ -93,10 +90,6 @@ -type(user() :: #user{username :: username(), password :: password()}). --type(permission() :: - #permission{configure :: regexp(), - write :: regexp(), - read :: regexp()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod index 8a2d2e5b..7c4d3ef2 100644 --- a/packaging/windows/rabbitmq-service.pod +++ b/packaging/windows/rabbitmq-service.pod @@ -92,10 +92,8 @@ Defaults to 5672. =head2 ERLANG_SERVICE_MANAGER_PATH -Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin> -(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit -environments). This is the installation location of the Erlang service -manager. +Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>. This is +the installation location of the Erlang service manager. =head2 CLUSTER_CONFIG_FILE diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl new file mode 100644 index 00000000..344b719a --- /dev/null +++ b/src/buffering_proxy.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(buffering_proxy). + +-export([start_link/2]). + +%% internal + +-export([mainloop/4, drain/2]). +-export([proxy_loop/3]). + +-define(HIBERNATE_AFTER, 5000). + +%%---------------------------------------------------------------------------- + +start_link(M, A) -> + spawn_link( + fun () -> process_flag(trap_exit, true), + ProxyPid = self(), + Ref = make_ref(), + Pid = spawn_link( + fun () -> ProxyPid ! Ref, + mainloop(ProxyPid, Ref, M, + M:init(ProxyPid, A)) end), + proxy_loop(Ref, Pid, empty) + end). + +%%---------------------------------------------------------------------------- + +mainloop(ProxyPid, Ref, M, State) -> + NewState = + receive + {Ref, Messages} -> + NewSt = + lists:foldl(fun (Msg, S) -> + drain(M, M:handle_message(Msg, S)) + end, State, lists:reverse(Messages)), + ProxyPid ! Ref, + NewSt; + Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) + end, + ?MODULE:mainloop(ProxyPid, Ref, M, NewState). + +drain(M, State) -> + receive + Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) + after 0 -> + State + end. + +proxy_loop(Ref, Pid, State) -> + receive + Ref -> + ?MODULE:proxy_loop( + Ref, Pid, + case State of + empty -> waiting; + waiting -> exit(duplicate_next); + Messages -> Pid ! {Ref, Messages}, empty + end); + {'EXIT', Pid, Reason} -> + exit(Reason); + {'EXIT', _, Reason} -> + exit(Pid, Reason), + ?MODULE:proxy_loop(Ref, Pid, State); + Msg -> + ?MODULE:proxy_loop( + Ref, Pid, + case State of + empty -> [Msg]; + waiting -> Pid ! {Ref, [Msg]}, empty; + Messages -> [Msg | Messages] + end) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) + end. diff --git a/src/rabbit.erl b/src/rabbit.erl index 1ddb5151..c8c814b6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -75,20 +75,19 @@ start() -> try ok = ensure_working_log_handlers(), ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = rabbit_misc:start_applications(?APPS) + ok = start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) end. stop() -> - ok = rabbit_misc:stop_applications(?APPS). + ok = stop_applications(?APPS). stop_and_halt() -> spawn(fun () -> SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; " - "halting in ~p milliseconds~n", + rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n", [SleepTime]), timer:sleep(SleepTime), init:stop() @@ -110,6 +109,34 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + start(normal, []) -> {ok, SupPid} = rabbit_sup:start_link(), @@ -273,14 +300,9 @@ insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), - {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = - application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), - ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurePerm, - DefaultWritePerm, - DefaultReadPerm), + ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index e61eb87f..54348d9a 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -45,13 +45,11 @@ -ifdef(use_specs). --type(permission_atom() :: 'configure' | 'read' | 'write'). - -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). --spec(check_vhost_access/2 :: (username(), vhost()) -> 'ok'). +-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). -spec(check_resource_access/3 :: - (username(), r(atom()), permission_atom()) -> 'ok'). + (username(), r(atom()), non_neg_integer()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -128,7 +126,7 @@ internal_lookup_vhost_access(Username, VHostPath) -> end end). -check_vhost_access(Username, VHostPath) -> +check_vhost_access(#user{username = Username}, VHostPath) -> ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), case internal_lookup_vhost_access(Username, VHostPath) of {ok, _R} -> @@ -139,10 +137,6 @@ check_vhost_access(Username, VHostPath) -> [VHostPath, Username]) end. -permission_index(configure) -> #permission.configure; -permission_index(write) -> #permission.write; -permission_index(read) -> #permission.read. - check_resource_access(Username, R = #resource{kind = exchange, name = <<"">>}, Permission) -> @@ -164,7 +158,7 @@ check_resource_access(Username, [#user_permission{permission = P}] -> case regexp:match( binary_to_list(Name), - binary_to_list(element(permission_index(Permission), P))) of + binary_to_list(element(Permission, P))) of {match, _, _} -> true; nomatch -> false end diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16..73c6e290 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -33,7 +33,7 @@ -behaviour(gen_event). --export([start/1, stop/0, register/2]). +-export([start/0, stop/0, maybe_conserve_memory/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). @@ -52,8 +52,13 @@ -type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/1 :: (bool() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). +<<<<<<< local +-spec(maybe_conserve_memory/1 :: (pid()) -> 'ok'). +======= +-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). + +>>>>>>> other -endif. %%---------------------------------------------------------------------------- @@ -76,10 +81,16 @@ start(MemoryAlarms) -> stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). +<<<<<<< /tmp/rabbitmq-server/src/rabbit_alarm.erl +maybe_conserve_memory(QPid) -> + gen_event:call(alarm_handler, ?MODULE, {maybe_conserve_memory, QPid}). + {register, Pid, HighMemMFA}). +======= register(Pid, HighMemMFA) -> ok = gen_event:call(alarm_handler, ?MODULE, {register, Pid, HighMemMFA}, infinity). +>>>>>>> /tmp/rabbit_alarm.erl~other.Lee8ob %%---------------------------------------------------------------------------- @@ -89,12 +100,9 @@ init([MemoryAlarms]) -> false -> undefined end}}. -handle_call({register, _Pid, _HighMemMFA}, - State = #alarms{alertees = undefined}) -> - {ok, ok, State}; -handle_call({register, Pid, HighMemMFA}, - State = #alarms{alertees = Alertess}) -> - _MRef = erlang:monitor(process, Pid), +handle_call({maybe_conserve_memory, QPid}, + State = #alarms{system_memory_high_watermark = Conserve}) -> + {ok, rabbit_amqqueue:conserve_memory(QPid, Conserve), State}; case State#alarms.system_memory_high_watermark of true -> {M, F, A} = HighMemMFA, ok = erlang:apply(M, F, A ++ [Pid, true]); @@ -102,16 +110,16 @@ handle_call({register, Pid, HighMemMFA}, end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), {ok, ok, State#alarms{alertees = NewAlertees}}; - + handle_call(_Request, State) -> {ok, not_understood, State}. handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> - ok = alert(true, State#alarms.alertees), + rabbit_amqqueue:conserve_memory(true), {ok, State#alarms{system_memory_high_watermark = true}}; handle_event({clear_alarm, system_memory_high_watermark}, State) -> - ok = alert(false, State#alarms.alertees), + rabbit_amqqueue:conserve_memory(false), {ok, State#alarms{system_memory_high_watermark = false}}; handle_event(_Event, State) -> @@ -136,7 +144,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of + Mod = case os:type() of %% memsup doesn't take account of buffers or cache when %% considering "free" memory - therefore on Linux we can %% get memory alarms very easily without any pressure @@ -144,7 +152,7 @@ start_memsup() -> %% our own simple memory monitor. %% {unix, linux} -> rabbit_memsup_linux; - + %% Start memsup programmatically rather than via the %% rabbitmq-server script. This is not quite the right %% thing to do as os_mon checks to see if memsup is diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 198e2782..eb076e94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,11 +31,10 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1]). +-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). @@ -85,7 +84,7 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), delivery()) -> bool()). +-spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()). -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). @@ -103,7 +102,6 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -159,17 +157,11 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), - internal_declare(Q, true). - -internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), - case WantDefaultBinding of - true -> add_default_binding(Q); - false -> ok - end, + ok = add_default_binding(Q), Q; [ExistingQ] -> ExistingQ end @@ -209,7 +201,9 @@ with(Name, F, E) -> with(Name, F) -> with(Name, F, fun () -> {error, not_found} end). with_or_die(Name, F) -> - with(Name, F, fun () -> rabbit_misc:not_found(Name) end). + with(Name, F, fun () -> rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(Name)]) + end). list(VHostPath) -> mnesia:dirty_match_object( @@ -241,16 +235,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, - infinity); -deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), +deliver(_IsMandatory, true, Txn, Message, QPid) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); +deliver(true, _IsImmediate, Txn, Message, QPid) -> + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(false, _IsImmediate, Txn, Message, QPid) -> + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f..c390b2b7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,21 +53,19 @@ has_had_consumers, next_msg_id, message_buffer, - active_consumers, - blocked_consumers}). + round_robin}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary --record(cr, {consumer_count, +-record(cr, {consumers, ch_pid, limiter_pid, monitor_ref, unacked_messages, is_limit_active, - txn, unsent_message_count}). -define(INFO_KEYS, @@ -100,8 +98,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -131,12 +128,11 @@ ch_record(ChPid) -> case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, + C = #cr{consumers = [], ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), is_limit_active = false, - txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -150,7 +146,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> BlockedOld = is_ch_blocked(OldCR), @@ -160,25 +156,20 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -record_current_channel_tx(ChPid, Txn) -> - %% as a side effect this also starts monitoring the channel (if - %% that wasn't happening already) - store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, + round_robin = RoundRobin, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), - case queue:out(ActiveConsumers) of + case queue:out(RoundRobin) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, - ActiveConsumersTail} -> + RoundRobinTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -190,38 +181,24 @@ deliver_immediately(Message, Delivered, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), - {NewActiveConsumers, NewBlockedConsumers} = + NewConsumers = case ch_record_state_transition(C, NewC) of - ok -> {queue:in(QEntry, ActiveConsumersTail), - BlockedConsumers}; - block -> - {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) end, - {offered, AckRequired, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}}; + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; false -> store_ch_record(C#cr{is_limit_active = true}), - {NewActiveConsumers, NewBlockedConsumers} = - move_consumers(ChPid, - ActiveConsumers, - BlockedConsumers), - deliver_immediately( - Message, Delivered, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) end; {empty, _} -> {not_offered, State} end. -attempt_delivery(none, _ChPid, Message, State) -> +attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> {true, State1}; @@ -232,13 +209,13 @@ attempt_delivery(none, _ChPid, Message, State) -> {not_offered, State1} -> {false, State1} end; -attempt_delivery(Txn, ChPid, Message, State) -> +attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), - record_pending_message(Txn, ChPid, Message), + record_pending_message(Txn, Message), {true, State}. -deliver_or_enqueue(Txn, ChPid, Message, State) -> - case attempt_delivery(Txn, ChPid, Message, State) of +deliver_or_enqueue(Txn, Message, State) -> + case attempt_delivery(Txn, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> @@ -251,24 +228,22 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), State). -add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). +block_consumers(ChPid, RoundRobin) -> + %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), + queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(RoundRobin))). + +unblock_consumers(ChPid, Consumers, RoundRobin) -> + %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]), + queue:join(RoundRobin, + queue:from_list([{ChPid, Con} || Con <- Consumers])). -remove_consumer(ChPid, ConsumerTag, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 +block_consumer(ChPid, ConsumerTag, RoundRobin) -> + %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]), queue:from_list(lists:filter( fun ({CP, #consumer{tag = CT}}) -> (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(Queue))). - -remove_consumers(ChPid, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue))). - -move_consumers(ChPid, From, To) -> - {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + end, queue:to_list(RoundRobin))). possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -279,48 +254,65 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> {NewBlockedeConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_poke_burst( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedeConsumers}) + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) end end. -should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; -should_auto_delete(#q{has_had_consumers = false}) -> false; -should_auto_delete(State) -> is_unused(State). +check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> + {continue, State}; +check_auto_delete(State = #q{has_had_consumers = false}) -> + {continue, State}; +check_auto_delete(State = #q{round_robin = RoundRobin}) -> + % The clauses above rule out cases where no-one has consumed from + % this queue yet, and cases where we are not an auto_delete queue + % in any case. Thus it remains to check whether we have any active + % listeners at this point. + case queue:is_empty(RoundRobin) of + true -> + % There are no waiting listeners. It's possible that we're + % completely unused. Check. + case is_unused() of + true -> + % There are no active consumers at this + % point. This is the signal to autodelete. + {stop, State}; + false -> + % There is at least one active consumer, so we + % shouldn't delete ourselves. + {continue, State} + end; + false -> + % There are some waiting listeners, thus we are not + % unused, so can continue life as normal without needing + % to check the process dictionary. + {continue, State} + end. -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of not_found -> noreply(State); - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - unacked_messages = UAM} -> + #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> + NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), erase({ch, ChPid}), - case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State)), - erase_tx(Txn) - end, - NewState = - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - active_consumers = remove_consumers( - ChPid, State#q.active_consumers), - blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}), - case should_auto_delete(NewState) of - false -> noreply(NewState); - true -> {stop, normal, NewState} + case check_auto_delete( + deliver_or_enqueue_n( + [{Message, true} || + {_Messsage_id, Message} <- dict:to_list(UAM)], + State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + round_robin = NewActive})) of + {continue, NewState} -> + noreply(NewState); + {stop, NewState} -> + {stop, normal, NewState} end end. @@ -333,12 +325,12 @@ check_queue_owner(none, _) -> ok; check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; check_queue_owner({_, _}, _) -> mismatch. -check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> +check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) -> in_use; -check_exclusive_access(none, false, _State) -> +check_exclusive_access(none, false) -> ok; -check_exclusive_access(none, true, State) -> - case is_unused(State) of +check_exclusive_access(none, true) -> + case is_unused() of true -> ok; false -> in_use end. @@ -363,8 +355,16 @@ run_poke_burst(MessageBuffer, State) -> State#q{message_buffer = MessageBuffer} end. -is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso - queue:is_empty(State#q.blocked_consumers). +is_unused() -> + is_unused1(get()). + +is_unused1([]) -> + true; +is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest]) + when Consumers /= [] -> + false; +is_unused1([_ | Rest]) -> + is_unused1(Rest). maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -456,17 +456,13 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -record_pending_message(Txn, ChPid, Message) -> +record_pending_message(Txn, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). process_pending(Txn, State) -> #tx{ch_pid = ChPid, @@ -523,8 +519,9 @@ i(messages, State) -> i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); -i(consumers, State) -> - queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); +i(consumers, _) -> + lists:sum([length(Consumers) || + #cr{consumers = Consumers} <- all_ch_record()]); i(transactions, _) -> length(all_tx_record()); i(memory, _) -> @@ -544,7 +541,7 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -558,12 +555,12 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, Message, State), reply(Delivered, NewState); -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver, Txn, Message}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> @@ -606,91 +603,78 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder}) -> + exclusive_consumer = ExistingHolder, + round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of + case check_exclusive_access(ExistingHolder, ExclusiveConsume) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, + C = #cr{consumers = Consumers} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, + store_ch_record(C#cr{consumers = [Consumer | Consumers], limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> + if Consumers == [] -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, + exclusive_consumer = + if + ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_poke_burst( - State1#q{ - active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) - end, - reply(ok, State2) + reply(ok, run_poke_burst(State1)) end end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder}) -> + State = #q{exclusive_consumer = Holder, + round_robin = RoundRobin}) -> case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> - store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - if ConsumerCount == 1 -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> + NewConsumers = lists:filter + (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, + Consumers), + store_ch_record(C#cr{consumers = NewConsumers}), + if NewConsumers == [] -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> ok end, ok = maybe_send_reply(ChPid, OkMsg), - NewState = - State#q{exclusive_consumer = cancel_holder(ChPid, - ConsumerTag, - Holder), - active_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.active_consumers), - blocked_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.blocked_consumers)}, - case should_auto_delete(NewState) of - false -> reply(ok, NewState); - true -> {stop, normal, ok, NewState} + case check_auto_delete( + State#q{exclusive_consumer = cancel_holder(ChPid, + ConsumerTag, + Holder), + round_robin = block_consumer(ChPid, + ConsumerTag, + RoundRobin)}) of + {continue, State1} -> + reply(ok, State1); + {stop, State1} -> + {stop, normal, ok, State1} end end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, - active_consumers = ActiveConsumers}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, - State); + round_robin = RoundRobin}) -> + reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> IsEmpty = queue:is_empty(MessageBuffer), - IsUnused = is_unused(State), + IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); @@ -709,7 +693,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> - case check_exclusive_access(Holder, true, State) of + case check_exclusive_access(Holder, true) of in_use -> %% FIXME: Is this really the right answer? What if %% an active consumer's reader is actually the @@ -727,9 +711,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(locked, State) end. -handle_cast({deliver, Txn, Message, ChPid}, State) -> +handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> @@ -785,10 +769,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, + fun (C = #cr{consumers = Consumers, limiter_pid = OldLimiterPid, is_limit_active = Limited}) -> - if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3089bb62..b2716ec4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -231,13 +231,13 @@ clear_permission_cache() -> ok. check_configure_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, configure). + check_resource_access(Username, Resource, #permission.configure). check_write_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, write). + check_resource_access(Username, Resource, #permission.write). check_read_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, read). + check_resource_access(Username, Resource, #permission.read). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -306,9 +306,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -319,30 +317,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> rabbit_guid:guid(); false -> none end, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - persistent_key = PersistentKey}, - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> - ok; - unroutable -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); - not_delivered -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) - end, - {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) - end}; + {noreply, publish(Mandatory, Immediate, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + persistent_key = PersistentKey}, + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -571,13 +551,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, {ok, FoundX} -> FoundX; {error, not_found} -> check_name('exchange', ExchangeNameBin), - case rabbit_misc:r_arg(VHostPath, exchange, Args, - <<"alternate-exchange">>) of - undefined -> ok; - AName -> check_read_permitted(ExchangeName, State), - check_write_permitted(AName, State), - ok - end, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, @@ -606,7 +579,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> - rabbit_misc:not_found(ExchangeName); + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, in_use} -> die_precondition_failed( "~s in use", [rabbit_misc:rs(ExchangeName)]); @@ -772,9 +746,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, exchange_not_found} -> - rabbit_misc:not_found(ExchangeName); + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, queue_not_found} -> - rabbit_misc:not_found(QueueName); + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); {error, exchange_and_queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), @@ -791,6 +767,30 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. +publish(Mandatory, Immediate, Message, QPids, + State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> + Handled = deliver(QPids, Mandatory, Immediate, TxnKey, + Message, WriterPid), + case TxnKey of + none -> State; + _ -> add_tx_participants(Handled, State) + end. + +deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> + case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of + {ok, DeliveredQPids} -> DeliveredQPids; + {error, unroutable} -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), + []; + {error, not_delivered} -> + %% FIXME: 313 should be replaced by the ?NO_CONSUMERS + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), + [] + end. + basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6649899a..352d7e75 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -127,10 +127,10 @@ Available commands: delete_vhost <VHostPath> list_vhosts - set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> - clear_permissions [-p <VHostPath>] <UserName> - list_permissions [-p <VHostPath>] - list_user_permissions <UserName> + map_user_vhost <UserName> <VHostPath> + unmap_user_vhost <UserName> <VHostPath> + list_user_vhosts <UserName> + list_vhost_users <VHostPath> list_queues [-p <VHostPath>] [<QueueInfoItem> ...] list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] @@ -236,14 +236,25 @@ action(list_vhosts, Node, [], Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(list_user_permissions, Node, Args = [_Username], Inform) -> - Inform("Listing permissions for user ~p", Args), - display_list(call(Node, {rabbit_access_control, list_user_permissions, - Args})); +action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> + Inform("Mapping user ~p to vhost ~p", Args), + call(Node, {rabbit_access_control, map_user_vhost, Args}); + +action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> + Inform("Unmapping user ~p from vhost ~p", Args), + call(Node, {rabbit_access_control, unmap_user_vhost, Args}); + +action(list_user_vhosts, Node, Args = [_Username], Inform) -> + Inform("Listing vhosts for user ~p", Args), + display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args})); + +action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> + Inform("Listing users for vhosts ~p", Args), + display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag(Args), ArgAtoms = list_replace(node, pid, default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, @@ -252,7 +263,7 @@ action(list_queues, Node, Args, Inform) -> action(list_exchanges, Node, Args, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag(Args), ArgAtoms = default_if_empty(RemainingArgs, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), @@ -260,7 +271,7 @@ action(list_exchanges, Node, Args, Inform) -> action(list_bindings, Node, Args, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + {VHostArg, _} = parse_vhost_flag(Args), InfoKeys = [exchange_name, routing_key, queue_name, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || @@ -274,37 +285,15 @@ action(list_connections, Node, Args, Inform) -> default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), - ArgAtoms); - -action(Command, Node, Args, Inform) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - action(Command, Node, VHost, RemainingArgs, Inform). - -action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> - Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, WPerm, RPerm]}); - -action(clear_permissions, Node, VHost, [Username], Inform) -> - Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), - call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); - -action(list_permissions, Node, VHost, [], Inform) -> - Inform("Listing permissions in vhost ~p", [VHost]), - display_list(call(Node, {rabbit_access_control, list_vhost_permissions, - [VHost]})). + ArgAtoms). parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; - RemainingArgs -> - {"/", RemainingArgs} - end. - -parse_vhost_flag_bin(Args) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - {list_to_binary(VHost), RemainingArgs}. + case Args of + ["-p", VHost | RemainingArgs] -> + {list_to_binary(VHost), RemainingArgs}; + RemainingArgs -> + {<<"/">>, RemainingArgs} + end. default_if_empty(List, Default) when is_list(List) -> if List == [] -> @@ -314,17 +303,21 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> + io:fwrite( + lists:flatten( + rabbit_misc:intersperse( + "\t", + [format_info_item(Result, X) || X <- InfoItemKeys]))), + io:nl() + end, + Results), ok; + display_info_list(Other, _) -> Other. -display_row(Row) -> - io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), - io:nl(). - format_info_item(Items, Key) -> {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), case Info of @@ -341,10 +334,8 @@ format_info_item(Items, Key) -> end. display_list(L) when is_list(L) -> - lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [url_encode(I)]); - (I) when is_tuple(I) -> - display_row([url_encode(V) || V <- tuple_to_list(I)]) + lists:foreach(fun (I) -> + io:format("~s~n", [binary_to_list(I)]) end, lists:sort(L)), ok; diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 183b6984..9a9220b5 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -46,7 +46,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " + rabbit_log:error("Failed to append contents of " ++ "log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7d9948f0..3b6338c7 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,8 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - publish/2]). + simple_publish/6, simple_publish/3, + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -56,6 +57,8 @@ -ifdef(use_specs). +-type(publish_res() :: {'ok', [pid()]} | + not_found() | {'error', 'unroutable' | 'not_delivered'}). -type(bind_res() :: 'ok' | {'error', 'queue_not_found' | 'exchange_not_found' | @@ -72,7 +75,11 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). +-spec(simple_publish/6 :: + (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> + publish_res()). +-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -157,7 +164,9 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of {ok, X} -> X; - {error, not_found} -> rabbit_misc:not_found(Name) + {error, not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(Name)]) end. list(VHostPath) -> @@ -187,41 +196,36 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X, Seen, Delivery = #delivery{ - message = #basic_message{routing_key = RK, content = C}}) -> - case rabbit_router:deliver(route(X, RK, C), Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> - R; - false -> - case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +%% Usable by Erlang code that wants to publish messages. +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, + ContentTypeBin, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + Content = #content{class_id = ClassId, + properties = #'P_basic'{content_type = ContentTypeBin}, + properties_bin = none, + payload_fragments_rev = [BodyBin]}, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + persistent_key = none}, + simple_publish(Mandatory, Immediate, Message). + +%% Usable by Erlang code that wants to publish messages. +simple_publish(Mandatory, Immediate, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}) -> + case lookup(ExchangeName) of + {ok, Exchange} -> + QPids = route(Exchange, RoutingKey, Content), + rabbit_router:deliver(QPids, Mandatory, Immediate, + none, Message); + {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% @@ -248,9 +252,6 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> route(X = #exchange{type = direct}, RoutingKey, _Content) -> match_routing_key(X, RoutingKey). -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange match_bindings(#exchange{name = Name}, Match) -> @@ -346,13 +347,16 @@ exchanges_for_queue(QueueName) -> sets:from_list( mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). -contains(Table, MatchHead) -> +has_bindings(ExchangeName) -> + MatchHead = #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, try - continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) + continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}], + 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(Table, MatchHead, read) of + case mnesia:match_object(rabbit_route, MatchHead, read) of [] -> false; [_|_] -> true end @@ -382,40 +386,32 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) + true -> ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end - end). - -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}) + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:delete_object/3), + maybe_auto_delete(X) end). -sync_binding(Binding, Durable, Fun) -> +sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> + Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = sort_arguments(Arguments)}, ok = case Durable of true -> Fun(rabbit_durable_route, #route{binding = Binding}, write); @@ -481,7 +477,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and {add,delete}_binding/4 do. +%% route/3 and sync_binding/6 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. @@ -570,11 +566,7 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> ok. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - %% we need to check for durable routes here too in case a bunch of - %% routes to durable queues have been removed temporarily as a - %% result of a node failure - case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of + case has_bindings(ExchangeName) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 5c447792..060bed48 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -95,15 +95,13 @@ collect_content(ChannelPid, MethodName) -> true -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", + "expected content header for class ~w, got one for class ~w instead", [ClassId, HeaderClassId]) end; _ -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, " - "got non content header frame instead", + "expected content header for class ~w, got non content header frame instead", [ClassId]) end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 72e16f0f..4da247a4 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,7 +51,6 @@ -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). --export([start_applications/1, stop_applications/1]). -import(mnesia). -import(lists). @@ -110,9 +109,13 @@ -spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). +<<<<<<< /tmp/rabbitmq-server/src/rabbit_misc.erl +-spec(format_stderr/2 :: (string(), [any()]) -> 'true'). +======= -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +>>>>>>> /tmp/rabbit_misc.erl~other.qjyLOB -endif. @@ -249,7 +252,7 @@ filter_exit_map(F, L) -> with_user(Username, Thunk) -> fun () -> - case mnesia:read({rabbit_user, Username}) of + case mnesia:read({user, Username}) of [] -> mnesia:abort({no_such_user, Username}); [_U] -> @@ -259,7 +262,7 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> - case mnesia:read({rabbit_vhost, VHostPath}) of + case mnesia:read({vhost, VHostPath}) of [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> @@ -389,6 +392,11 @@ ensure_parent_dirs_exist(Filename) -> end. format_stderr(Fmt, Args) -> +<<<<<<< /tmp/rabbitmq-server/src/rabbit_misc.erl + Port = open_port({fd, 0, 2}, [out]), + port_command(Port, io_lib:format(Fmt, Args)), + port_close(Port). +======= case os:type() of {unix, _} -> Port = open_port({fd, 0, 2}, [out]), @@ -431,3 +439,4 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +>>>>>>> /tmp/rabbit_misc.erl~other.qjyLOB diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0a..0c573073 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -101,50 +101,33 @@ force_reset() -> reset(true). %%-------------------------------------------------------------------- table_definitions() -> - [{rabbit_user, - [{record_name, user}, - {attributes, record_info(fields, user)}, - {disc_copies, [node()]}]}, - {rabbit_user_permission, - [{record_name, user_permission}, - {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}]}, - {rabbit_vhost, - [{record_name, vhost}, - {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}]}, - {rabbit_config, - [{disc_copies, [node()]}]}, - {rabbit_listener, - [{record_name, listener}, - {attributes, record_info(fields, listener)}, - {type, bag}]}, - {rabbit_durable_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {disc_copies, [node()]}]}, - {rabbit_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {type, ordered_set}]}, - {rabbit_reverse_route, - [{record_name, reverse_route}, - {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}]}, - {rabbit_durable_exchange, - [{record_name, exchange}, - {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}]}, - {rabbit_exchange, - [{record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, - {rabbit_durable_queue, - [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}]}, - {rabbit_queue, - [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + [{user, [{disc_copies, [node()]}, + {attributes, record_info(fields, user)}]}, + {user_vhost, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, user_vhost)}, + {index, [virtual_host]}]}, + {vhost, [{disc_copies, [node()]}, + {attributes, record_info(fields, vhost)}]}, + {rabbit_config, [{disc_copies, [node()]}]}, + {listener, [{type, bag}, + {attributes, record_info(fields, listener)}]}, + {durable_routes, [{disc_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, + {durable_exchanges, [{disc_copies, [node()]}, + {record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {exchange, [{attributes, record_info(fields, exchange)}]}, + {durable_queues, [{disc_copies, [node()]}, + {record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}]}, + {amqqueue, [{attributes, record_info(fields, amqqueue)}, + {index, [pid]}]}]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. @@ -263,8 +246,8 @@ init_db(ClusterNodes) -> %% NB: we cannot use rabbit_log here since %% it may not have been started yet error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " + "schema integrity check failed: ~p~n" ++ + "moving database to backup location " ++ "and recreating schema from scratch~n", [Reason]), ok = move_db(), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2dbd5a5a..99ea37d8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -123,7 +123,6 @@ stop_tcp_listener(Host, Port) -> tcp_listener_started(IPAddress, Port) -> ok = mnesia:dirty_write( - rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), @@ -131,20 +130,19 @@ tcp_listener_started(IPAddress, Port) -> tcp_listener_stopped(IPAddress, Port) -> ok = mnesia:dirty_delete_object( - rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), port = Port}). active_listeners() -> - rabbit_misc:dirty_read_all(rabbit_listener). + rabbit_misc:dirty_read_all(listener). node_listeners(Node) -> - mnesia:dirty_read(rabbit_listener, Node). + mnesia:dirty_read(listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(rabbit_listener, Node). + ok = mnesia:dirty_delete(listener, Node). start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a67b2edc..985ca3e2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -105,18 +105,10 @@ %% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing -%% if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% else *closing* -%% channel exits normally -%% -> if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* +%% channel exit -> +%% if abnormal exit then log error +%% if last channel to exit then send connection.close_ok, start +%% terminate_connection timer, *closing* %% closed: %% socket close -> *terminate* %% receive connection.close_ok -> self() ! terminate_connection, @@ -173,8 +165,7 @@ setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of once -> - rabbit_log:info("Enabling profiling for this connection, " - "and disabling for subsequent.~n"), + rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"), rabbit_misc:set_config(profiling_enabled, false), fprof:trace(start); true -> @@ -288,8 +279,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -357,6 +346,7 @@ terminate_channel(Channel, Ref, State) -> end, State. +<<<<<<< local handle_channel_exit(Channel, Reason, State) -> %% We remove the channel from the inbound map only. That allows %% the channel to be re-opened, but also means the remaining @@ -365,13 +355,26 @@ handle_channel_exit(Channel, Reason, State) -> erase({channel, Channel}), handle_exception(State, Channel, Reason). +handle_dependent_exit(Pid, Reason, + State = #v1{connection_state = closing}) -> + case channel_cleanup(Pid) of + undefined -> exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + case Reason of + normal -> ok; + _ -> log_channel_error(closing, Channel, Reason) + end, + maybe_close(State) + end; +======= +>>>>>>> other handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), - maybe_close(State); + State; handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) + Channel -> handle_exception(State, Channel, Reason) end. channel_cleanup(Pid) -> @@ -419,8 +422,7 @@ wait_for_channel_termination(N, TimerRef) -> normal -> ok; _ -> rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", + "connection ~p, channel ~p - error while terminating:~n~p~n", [self(), Channel, Reason]) end, wait_for_channel_termination(N-1, TimerRef) @@ -429,15 +431,13 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State = #v1{connection_state = closing}) -> +maybe_close(State) -> case all_channels() of [] -> ok = send_on_channel0( State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State - end; -maybe_close(State) -> - State. + end. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> @@ -725,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/4, + [self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 10f80cc3..0b06a063 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). -export([start_link/0, - deliver/2]). + deliver/5]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -50,7 +50,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). +-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> + {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}). -endif. @@ -61,13 +62,13 @@ start_link() -> -ifdef(BUG19758). -deliver(QPids, Delivery) -> - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)). +deliver(QPids, Mandatory, Immediate, Txn, Message) -> + check_delivery(Mandatory, Immediate, + run_bindings(QPids, Mandatory, Immediate, Txn, Message)). -else. -deliver(QPids, Delivery) -> +deliver(QPids, Mandatory, Immediate, Txn, Message) -> %% we reduce inter-node traffic by grouping the qpids by node and %% only delivering one copy of the message to each node involved, %% which then in turn delivers it to its queues. @@ -80,14 +81,16 @@ deliver(QPids, Delivery) -> [QPid], D) end, dict:new(), QPids)), - Delivery). + Mandatory, Immediate, Txn, Message). -deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> +deliver_per_node([{Node, QPids}], Mandatory, Immediate, + Txn, Message) + when Node == node() -> %% optimisation - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)); -deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> + check_delivery(Mandatory, Immediate, + run_bindings(QPids, Mandatory, Immediate, Txn, Message)); +deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, + Txn, Message) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver in run_bindings below will deliver the %% message to the queue process asynchronously, and return true, @@ -95,19 +98,22 @@ deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {routed, - lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), - QPids - end, - NodeQPids)}; -deliver_per_node(NodeQPids, Delivery) -> + {ok, lists:flatmap( + fun ({Node, QPids}) -> + gen_server2:cast( + {?SERVER, Node}, + {deliver, QPids, Mandatory, Immediate, Txn, Message}), + QPids + end, + NodeQPids)}; +deliver_per_node(NodeQPids, Mandatory, Immediate, + Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server2:call({?SERVER, Node}, - {deliver, QPids, Delivery}, - infinity) + try gen_server2:call( + {?SERVER, Node}, + {deliver, QPids, Mandatory, Immediate, Txn, Message}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here @@ -124,8 +130,7 @@ deliver_per_node(NodeQPids, Delivery) -> end, {false, []}, R), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, lists:append(Handled)}). + check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}). -endif. @@ -134,17 +139,19 @@ deliver_per_node(NodeQPids, Delivery) -> init([]) -> {ok, no_state}. -handle_call({deliver, QPids, Delivery}, From, State) -> +handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, + From, State) -> spawn( fun () -> - R = run_bindings(QPids, Delivery), + R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), gen_server2:reply(From, R) end), {noreply, State}. -handle_cast({deliver, QPids, Delivery}, State) -> +handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message}, + State) -> %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Delivery), + run_bindings(QPids, Mandatory, Immediate, Txn, Message), {noreply, State}. handle_info(_Info, State) -> @@ -158,10 +165,11 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -run_bindings(QPids, Delivery) -> +run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> lists:foldl( fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(QPid, Delivery) of + case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, + Txn, Message, QPid) of true -> {true, [QPid | Handled]}; false -> {true, Handled}; {'EXIT', _Reason} -> {Routed, Handled} @@ -171,6 +179,6 @@ run_bindings(QPids, Delivery) -> QPids). %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {unroutable, []}; -check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; -check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. +check_delivery(true, _ , {false, []}) -> {error, unroutable}; +check_delivery(_ , true, {_ , []}) -> {error, not_delivered}; +check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}. diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 2a365ce1..9e4c9c8a 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -47,7 +47,7 @@ init({{File, Suffix}, []}) -> case rabbit_misc:append_file(File, Suffix) of ok -> ok; {error, Error} -> - rabbit_log:error("Failed to append contents of " + rabbit_log:error("Failed to append contents of " ++ "sasl log file '~s' to '~s':~n~p~n", [File, [File, Suffix], Error]) end, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8f0a3a89..bcfce33a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -507,16 +507,17 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(delete_vhost, ["/testhost"]), {error, {no_such_user, _}} = - control_action(set_permissions, ["foo", ".*", ".*", ".*"]), + control_action(map_user_vhost, ["foo", "/"]), {error, {no_such_user, _}} = - control_action(clear_permissions, ["foo"]), + control_action(unmap_user_vhost, ["foo", "/"]), {error, {no_such_user, _}} = - control_action(list_user_permissions, ["foo"]), + control_action(list_user_vhosts, ["foo"]), {error, {no_such_vhost, _}} = - control_action(list_permissions, ["-p", "/testhost"]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - + control_action(map_user_vhost, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(unmap_user_vhost, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(list_vhost_users, ["/testhost"]), %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = @@ -531,16 +532,13 @@ test_user_management() -> ok = control_action(list_vhosts, []), %% user/vhost mapping - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), - ok = control_action(list_permissions, ["-p", "/testhost"]), - ok = control_action(list_user_permissions, ["foo"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(list_user_vhosts, ["foo"]), %% user/vhost unmapping - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), - ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), + ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), + ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), %% vhost deletion ok = control_action(delete_vhost, ["/testhost"]), @@ -549,8 +547,7 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*", ".*"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion |