summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2010-12-13 17:25:43 +0000
committerEmile Joubert <emile@rabbitmq.com>2010-12-13 17:25:43 +0000
commit0e6c890f235b6e0582302aabd386598beac22c90 (patch)
treefe0ed1b285e80bcb64ad8835b73f68e27c28dc75
parent25cb0d23f23af7fdb4c8da2e9a1c504d7755aa46 (diff)
parent81339d71c5f73325fcc1b46f75b4d1c8b289ec1f (diff)
downloadrabbitmq-server-0e6c890f235b6e0582302aabd386598beac22c90.tar.gz
Merged bug23506 into default
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl41
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-multi3
-rwxr-xr-xscripts/rabbitmq-server3
-rw-r--r--scripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_access_control.erl55
-rw-r--r--src/rabbit_auth_mechanism.erl57
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl70
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl74
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_auth_mechanism_plain.erl61
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl147
-rw-r--r--src/rabbit_exchange.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_prelaunch.erl16
-rw-r--r--src/rabbit_reader.erl107
-rw-r--r--src/rabbit_registry.erl (renamed from src/rabbit_exchange_type_registry.erl)44
-rw-r--r--src/rabbit_ssl.erl25
-rw-r--r--src/rabbit_variable_queue.erl2
28 files changed, 669 insertions, 196 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index e4f773c0..35de1cea 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1024,6 +1024,10 @@
<listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>auth_mechanism</term>
+ <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 6c33ef8b..3888f198 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -30,4 +30,5 @@
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
{cluster_nodes, []},
{server_properties, []},
- {collect_statistics, none}]}]}.
+ {collect_statistics, none},
+ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']} ]} ]}.
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
new file mode 100644
index 00000000..93aa40bd
--- /dev/null
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(init/1 :: (rabbit_net:socket()) -> any()).
+-spec(handle_response/2 :: (binary(), any()) ->
+ {'ok', rabbit_types:user()} |
+ {'challenge', binary(), any()} |
+ {'protocol_error', string(), [any()]} |
+ {'refused', rabbit_access_control:username()}).
+
+-endif.
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 36734874..8cb470d0 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -48,6 +48,8 @@ done
SCRIPT_DIR=`dirname $SCRIPT_PATH`
RABBITMQ_HOME="${SCRIPT_DIR}/.."
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname`
+NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 59050692..33883702 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,8 +29,7 @@
##
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
+
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 66ce4384..4155b31d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,8 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
@@ -92,6 +90,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
-noinput \
-hidden \
-s rabbit_prelaunch \
+ -sname rabbitmqprelaunch$$ \
-extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
then
RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 872c87e3..52a250c6 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -118,6 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
+-sname rabbitmqprelaunch%RANDOM% ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 76ce25fd..56cff891 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,9 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
-
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ace8f286..2ebfdecf 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -69,10 +69,10 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_exchange_type_registry,
- [{description, "exchange type registry"},
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
{mfa, {rabbit_sup, start_child,
- [rabbit_exchange_type_registry]}},
+ [rabbit_registry]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 9141e7cd..f2d2b016 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -33,7 +33,7 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([check_login/2, user_pass_login/2, check_user_pass_login/2,
+-export([user_pass_login/2, check_user_pass_login/2, make_salt/0,
check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
@@ -54,15 +54,13 @@
-type(password() :: binary()).
-type(password_hash() :: binary()).
-type(regexp() :: binary()).
--spec(check_login/2 ::
- (binary(), binary()) -> rabbit_types:user() |
- rabbit_types:channel_exit()).
-spec(user_pass_login/2 ::
(username(), password())
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
(username(), password())
- -> {'ok', rabbit_types:user()} | 'refused').
+ -> {'ok', rabbit_types:user()} | {'refused', username()}).
+-spec(make_salt/0 :: () -> binary()).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
@@ -101,54 +99,27 @@
%%----------------------------------------------------------------------------
-%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
-%% apparently, by OpenAMQ.
-check_login(<<"PLAIN">>, Response) ->
- [User, Pass] = [list_to_binary(T) ||
- T <- string:tokens(binary_to_list(Response), [0])],
- user_pass_login(User, Pass);
-%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
-%% defines this as PLAIN, but in 0-9 that definition is gone, instead
-%% referring generically to "SASL security mechanism", i.e. the above.
-check_login(<<"AMQPLAIN">>, Response) ->
- LoginTable = rabbit_binary_parser:parse_table(Response),
- case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
- lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
- {{value, {_, longstr, User}},
- {value, {_, longstr, Pass}}} ->
- user_pass_login(User, Pass);
- _ ->
- %% Is this an information leak?
- rabbit_misc:protocol_error(
- access_refused,
- "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable])
- end;
-
-check_login(Mechanism, _Response) ->
- rabbit_misc:protocol_error(
- access_refused, "unsupported authentication mechanism '~s'",
- [Mechanism]).
-
user_pass_login(User, Pass) ->
?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
case check_user_pass_login(User, Pass) of
- refused ->
+ {refused, _} ->
rabbit_misc:protocol_error(
access_refused, "login refused for user '~s'", [User]);
{ok, U} ->
U
end.
-check_user_pass_login(User, Pass) ->
- case lookup_user(User) of
- {ok, U} ->
- case check_password(Pass, U#user.password_hash) of
- true -> {ok, U};
- _ -> refused
+check_user_pass_login(Username, Pass) ->
+ Refused = {refused, io_lib:format("user '~s' - invalid credentials",
+ [Username])},
+ case lookup_user(Username) of
+ {ok, User} ->
+ case check_password(Pass, User#user.password_hash) of
+ true -> {ok, User};
+ _ -> Refused
end;
{error, not_found} ->
- refused
+ Refused
end.
internal_lookup_vhost_access(Username, VHostPath) ->
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
new file mode 100644
index 00000000..1258cb8d
--- /dev/null
+++ b/src/rabbit_auth_mechanism.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description.
+ {description, 0},
+
+ %% Called before authentication starts. Should create a state
+ %% object to be passed through all the stages of authentication.
+ {init, 1},
+
+ %% Handle a stage of authentication. Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {challenge, Challenge, NextState}
+ %% Another round is needed. Here's the state I want next time.
+ %% {protocol_error, Msg, Args}
+ %% Client got the protocol wrong. Log and die.
+ %% {refused, Username}
+ %% Client failed authentication. Log and die.
+ {handle_response, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
new file mode 100644
index 00000000..5d51d904
--- /dev/null
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_amqplain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism amqplain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
+%% defines this as PLAIN, but in 0-9 that definition is gone, instead
+%% referring generically to "SASL security mechanism", i.e. the above.
+
+description() ->
+ [{name, <<"AMQPLAIN">>},
+ {description, <<"QPid AMQPLAIN mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ LoginTable = rabbit_binary_parser:parse_table(Response),
+ case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
+ lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
+ {{value, {_, longstr, User}},
+ {value, {_, longstr, Pass}}} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error,
+ "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
+ [LoginTable]}
+ end.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
new file mode 100644
index 00000000..67665928
--- /dev/null
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_cr_demo).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism cr-demo"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"RABBIT-CR-DEMO">>,
+ ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok)
+%% START-OK: Username
+%% SECURE: "Please tell me your password"
+%% SECURE-OK: "My password is ~s", [Password]
+
+description() ->
+ [{name, <<"RABBIT-CR-DEMO">>},
+ {description, <<"RabbitMQ Demo challenge-response authentication "
+ "mechanism">>}].
+
+init(_Sock) ->
+ #state{}.
+
+handle_response(Response, State = #state{username = undefined}) ->
+ {challenge, <<"Please tell me your password">>,
+ State#state{username = Response}};
+
+handle_response(Response, #state{username = Username}) ->
+ case Response of
+ <<"My password is ", Password/binary>> ->
+ rabbit_access_control:check_user_pass_login(Username, Password);
+ _ ->
+ {protocol_error, "Invalid response '~s'", [Response]}
+ end.
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
new file mode 100644
index 00000000..b21dd313
--- /dev/null
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_external).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism external"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
+%% established by means external to the mechanism". We define that to
+%% mean the peer certificate's subject's CN.
+
+description() ->
+ [{name, <<"EXTERNAL">>},
+ {description, <<"SASL EXTERNAL authentication mechanism">>}].
+
+init(Sock) ->
+ Username = case rabbit_net:peercert(Sock) of
+ {ok, C} ->
+ CN = case rabbit_ssl:peer_cert_subject_item(
+ C, ?'id-at-commonName') of
+ not_found -> {refused, "no CN found"};
+ CN0 -> list_to_binary(CN0)
+ end,
+ case config_sane() of
+ true -> CN;
+ false -> {refused, "configuration unsafe"}
+ end;
+ {error, no_peercert} ->
+ {refused, "no peer certificate"};
+ nossl ->
+ {refused, "not SSL connection"}
+ end,
+ #state{username = Username}.
+
+handle_response(_Response, #state{username = Username}) ->
+ case Username of
+ {refused, _} = E ->
+ E;
+ _ ->
+ case rabbit_access_control:lookup_user(Username) of
+ {ok, User} ->
+ {ok, User};
+ {error, not_found} ->
+ %% This is not an information leak as we have to
+ %% have validated a client cert to get this far.
+ {refused, io_lib:format("user '~s' not found", [Username])}
+ end
+ end.
+
+%%--------------------------------------------------------------------------
+
+config_sane() ->
+ {ok, Opts} = application:get_env(ssl_options),
+ case {proplists:get_value(fail_if_no_peer_cert, Opts),
+ proplists:get_value(verify, Opts)} of
+ {true, verify_peer} ->
+ true;
+ {F, V} ->
+ rabbit_log:warning("EXTERNAL mechanism disabled, "
+ "fail_if_no_peer_cert=~p; "
+ "verify=~p~n", [F, V]),
+ false
+ end.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
new file mode 100644
index 00000000..8758f85f
--- /dev/null
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_plain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism plain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"PLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
+%% apparently, by OpenAMQ.
+
+description() ->
+ [{name, <<"PLAIN">>},
+ {description, <<"SASL PLAIN authentication mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ [User, Pass] = [list_to_binary(T) ||
+ T <- string:tokens(binary_to_list(Response), [0])],
+ rabbit_access_control:check_user_pass_login(User, Pass).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 668fb9bb..ccadf5af 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a1db2ccf..4e9bd4b1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,10 +35,10 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
+-export([emit_stats/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -72,7 +72,7 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000).
+-define(FLUSH_CONFIRMS_INTERVAL, 1000).
%%----------------------------------------------------------------------------
@@ -90,12 +90,15 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
+-spec(flush_confirms/1 :: (pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -103,8 +106,6 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
--spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
-endif.
@@ -121,6 +122,9 @@ do(Pid, Method) ->
do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -133,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+confirm(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
+
+flush_confirms(Pid) ->
+ gen_server2:cast(Pid, flush_confirms).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -156,15 +166,6 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
-flush(Pid) ->
- gen_server2:call(Pid, flush).
-
-flush_multiple_acks(Pid) ->
- gen_server2:cast(Pid, flush_multiple_acks).
-
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
@@ -215,6 +216,9 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -224,9 +228,6 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(flush, _From, State) ->
- reply(ok, State);
-
handle_call(_Request, _From, State) ->
noreply(State).
@@ -261,10 +262,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired,
- Msg = {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}},
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired,
@@ -291,11 +292,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_multiple_acks, State) ->
- {noreply, flush_multiple(State)};
+handle_cast(flush_confirms, State) ->
+ {noreply, internal_flush_confirms(State)};
handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+ {noreply, confirm(MsgSeqNo, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -303,7 +304,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> confirm(Msg, QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -313,7 +314,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = flush_multiple(State),
+ State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
@@ -465,11 +466,11 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
+confirm(undefined, _QPid, State) ->
State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
@@ -477,7 +478,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
start_confirm_timer(
@@ -1231,12 +1232,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1250,6 +1251,45 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
+ ?MODULE, flush_confirms, [self()]),
+ State#ch{confirm_tref = TRef};
+start_confirm_timer(State) ->
+ State.
+
+stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ State;
+stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#ch{confirm_tref = undefined}.
+
+internal_flush_confirms(State = #ch{writer_pid = WriterPid,
+ held_confirms = Cs}) ->
+ case gb_sets:is_empty(Cs) of
+ true -> State#ch{confirm_tref = undefined};
+ false -> [First | Rest] = gb_sets:to_list(Cs),
+ {Mult, Inds} = find_consecutive_sequence(First, Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult, multiple = true}),
+ ok = lists:foldl(
+ fun(T, ok) -> rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = T})
+ end, ok, Inds),
+ State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}
+ end.
+
+%% Find longest sequence of consecutive numbers at the beginning.
+find_consecutive_sequence(Last, []) ->
+ {Last, []};
+find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
+ find_consecutive_sequence(N, Ns);
+find_consecutive_sequence(Last, Ns) ->
+ {Last, Ns}.
+
terminate(State) ->
stop_confirm_timer(State),
pg_local:leave(rabbit_channels, self()),
@@ -1337,42 +1377,3 @@ erase_queue_stats(QPid) ->
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
- ?MODULE, flush_multiple_acks, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-flush_multiple(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 00e479a2..7414c904 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -150,17 +150,17 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
- case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- case rabbit_exchange_type_registry:lookup_module(T) of
+ case rabbit_registry:lookup_module(exchange, T) of
{error, not_found} -> rabbit_misc:protocol_error(
command_invalid,
"invalid exchange type '~s'", [T]);
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d934a497..d49d0199 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"direct">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 77ca9686..e7f75464 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"fanout">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index ec9e7ba4..caf141fe 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -42,9 +42,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"headers">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-ifdef(use_specs).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d3ecdd4d..44851858 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"topic">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-export([topic_matches/2]).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a62e7a6f..dadfc16e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -388,7 +388,7 @@ init_db(ClusterNodes, Force) ->
ensure_version_ok(rabbit_upgrade:read_version()),
ensure_schema_ok();
{[], false, _} ->
- %% First RAM node in cluster, start from scratch
+ %% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 867ecb12..fcc93e6f 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -260,7 +260,7 @@ duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
duplicate_node_check(Node) ->
- {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
+ {NodeName, NodeHost} = rabbit_misc:nodeparts(rabbit_misc:makenode(Node)),
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
@@ -272,7 +272,6 @@ duplicate_node_check(Node) ->
terminate(?ERROR_CODE);
false -> ok
end;
- {error, address} -> ok;
{error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
[EpmdReason])
end.
@@ -283,12 +282,9 @@ terminate(Fmt, Args) ->
terminate(Status) ->
case os:type() of
- {unix, _} ->
- halt(Status);
- {win32, _} ->
- init:stop(Status),
- receive
- after infinity -> ok
- end
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
end.
-
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4dd150a2..15b20bc4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -56,14 +56,15 @@
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun}).
+ channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
+ auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
- peer_cert_validity,
+ peer_cert_validity, auth_mechanism,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -294,7 +295,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
stats_timer =
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun
+ start_heartbeat_fun = StartHeartbeatFun,
+ auth_mechanism = none,
+ auth_state = none
},
handshake, 8))
catch
@@ -681,11 +684,12 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
- Start = #'connection.start'{ version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> },
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = auth_mechanisms_binary(),
+ locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
@@ -733,19 +737,22 @@ handle_method0(MethodName, FieldsBin,
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
- State = #v1{connection_state = starting,
- connection = Connection =
- #connection{protocol = Protocol},
- sock = Sock}) ->
- User = rabbit_access_control:check_login(Mechanism, Response),
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
- heartbeat = 0},
- ok = send_on_channel0(Sock, Tune, Protocol),
- State#v1{connection_state = tuning,
- connection = Connection#connection{
- user = User,
- client_properties = ClientProperties}};
+ State0 = #v1{connection_state = starting,
+ connection = Connection,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism),
+ State = State0#v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock),
+ connection_state = securing,
+ connection =
+ Connection#connection{
+ client_properties = ClientProperties}},
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
@@ -827,6 +834,60 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+auth_mechanism_to_module(TypeBin) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms()),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms() ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ lists:member(Name, Configured)].
+
+auth_mechanisms_binary() ->
+ list_to_binary(
+ string:join(
+ [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+
+auth_phase(Response,
+ State = #v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthState,
+ connection = Connection =
+ #connection{protocol = Protocol},
+ sock = Sock}) ->
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Reason} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s login refused: ~s",
+ [proplists:get_value(name, AuthMechanism:description()), Reason]);
+ {protocol_error, Msg, Args} ->
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{auth_state = AuthState1};
+ {ok, User} ->
+ Tune = #'connection.tune'{channel_max = 0,
+ frame_max = ?FRAME_MAX,
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User}}
+ end.
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -864,6 +925,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) ->
none;
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
Protocol:version();
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
+ none;
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ proplists:get_value(name, Mechanism:description());
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl
index f15275b5..7a3fcb51 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_registry.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_exchange_type_registry).
+-module(rabbit_registry).
-behaviour(gen_server).
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, binary_to_type/1, lookup_module/1]).
+-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -46,11 +46,12 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
--spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_module/2 ::
+ (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]).
-endif.
@@ -61,8 +62,8 @@ start_link() ->
%%---------------------------------------------------------------------------
-register(TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+register(Class, TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
TypeAtom -> TypeAtom
end.
-lookup_module(T) when is_atom(T) ->
- case ets:lookup(?ETS_NAME, T) of
+lookup_module(Class, T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, {Class, T}) of
[{_, Module}] ->
{ok, Module};
[] ->
{error, not_found}
end.
+lookup_all(Class) ->
+ [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
+
%%---------------------------------------------------------------------------
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName)
- when is_binary(TypeName), is_atom(ModuleName) ->
- ok = sanity_check_module(ModuleName),
+internal_register(Class, TypeName, ModuleName)
+ when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(class_module(Class), ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName}),
+ {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
-sanity_check_module(Module) ->
- case catch lists:member(rabbit_exchange_type,
+sanity_check_module(ClassModule, Module) ->
+ case catch lists:member(ClassModule,
lists:flatten(
[Bs || {Attr, Bs} <-
Module:module_info(attributes),
Attr =:= behavior orelse
Attr =:= behaviour])) of
{'EXIT', {undef, _}} -> {error, not_module};
- false -> {error, not_exchange_type};
+ false -> {error, {not_type, ClassModule}};
true -> ok
end.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism.
+
%%---------------------------------------------------------------------------
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName}, _From, State) ->
- ok = internal_register(TypeName, ModuleName),
+handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1d8ce23b..a4da23e2 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -36,6 +36,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+-export([peer_cert_subject_item/2]).
%%--------------------------------------------------------------------------
@@ -45,9 +46,11 @@
-type(certificate() :: binary()).
--spec(peer_cert_issuer/1 :: (certificate()) -> string()).
--spec(peer_cert_subject/1 :: (certificate()) -> string()).
--spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject_item/2 ::
+ (certificate(), tuple()) -> string() | 'not_found').
-endif.
@@ -71,6 +74,14 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
+%% Return a part of the certificate's subject.
+peer_cert_subject_item(Cert, Type) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ find_by_type(Type, Subject)
+ end, Cert).
+
%% Return a string describing the certificate's validity.
peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
@@ -89,6 +100,14 @@ cert_info(F, Cert) ->
DecCert -> DecCert %%R14B onwards
end).
+find_by_type(Type, {rdnSequence, RDNs}) ->
+ case [V || #'AttributeTypeAndValue'{type = T, value = V}
+ <- lists:flatten(RDNs),
+ T == Type] of
+ [{printableString, S}] -> S;
+ [] -> not_found
+ end.
+
%%--------------------------------------------------------------------------
%% Formatting functions
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0db51165..bf7d2a5d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,7 @@
timestamp :: timestamp() }).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer (),
+ count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],