summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-14 12:37:46 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-14 12:37:46 +0000
commit4af08b87800ebb8156e5b95e31e0de9bff0a27bc (patch)
treed8cd9ab7dbe5e89e1f8c6caecb5775570b6aca4d
parentd8c97cf013c19cb19ba0d3235b4b030f4ed2690b (diff)
parent6fc0feb3e036f45a226e2e323cb018758a5e8db7 (diff)
downloadrabbitmq-server-4af08b87800ebb8156e5b95e31e0de9bff0a27bc.tar.gz
Merging default into bug23554
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl41
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_access_control.erl61
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_auth_mechanism.erl57
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl70
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl74
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_auth_mechanism_plain.erl61
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_control.erl4
-rw-r--r--src/rabbit_exchange.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_prelaunch.erl9
-rw-r--r--src/rabbit_reader.erl107
-rw-r--r--src/rabbit_registry.erl (renamed from src/rabbit_exchange_type_registry.erl)44
-rw-r--r--src/rabbit_ssl.erl25
-rw-r--r--src/rabbit_variable_queue.erl78
25 files changed, 664 insertions, 161 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6b02abe4..35de1cea 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -466,6 +466,25 @@
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>clear_password</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose password is to be cleared.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_password tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to clear the
+ password for the user named
+ <command>tonyg</command>. This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
@@ -1005,6 +1024,10 @@
<listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>auth_mechanism</term>
+ <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 6c33ef8b..3888f198 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -30,4 +30,5 @@
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
{cluster_nodes, []},
{server_properties, []},
- {collect_statistics, none}]}]}.
+ {collect_statistics, none},
+ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']} ]} ]}.
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
new file mode 100644
index 00000000..93aa40bd
--- /dev/null
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(init/1 :: (rabbit_net:socket()) -> any()).
+-spec(handle_response/2 :: (binary(), any()) ->
+ {'ok', rabbit_types:user()} |
+ {'challenge', binary(), any()} |
+ {'protocol_error', string(), [any()]} |
+ {'refused', rabbit_access_control:username()}).
+
+-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ace8f286..2ebfdecf 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -69,10 +69,10 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_exchange_type_registry,
- [{description, "exchange type registry"},
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
{mfa, {rabbit_sup, start_child,
- [rabbit_exchange_type_registry]}},
+ [rabbit_registry]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index bc588013..f2d2b016 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -33,10 +33,10 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([check_login/2, user_pass_login/2, check_user_pass_login/2,
+-export([user_pass_login/2, check_user_pass_login/2, make_salt/0,
check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
- clear_admin/1, list_users/0, lookup_user/1]).
+ clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
-export([change_password_hash/2, hash_password/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, clear_permissions/2,
@@ -54,15 +54,13 @@
-type(password() :: binary()).
-type(password_hash() :: binary()).
-type(regexp() :: binary()).
--spec(check_login/2 ::
- (binary(), binary()) -> rabbit_types:user() |
- rabbit_types:channel_exit()).
-spec(user_pass_login/2 ::
(username(), password())
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
(username(), password())
- -> {'ok', rabbit_types:user()} | 'refused').
+ -> {'ok', rabbit_types:user()} | {'refused', username()}).
+-spec(make_salt/0 :: () -> binary()).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
@@ -72,6 +70,7 @@
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
+-spec(clear_password/1 :: (username()) -> 'ok').
-spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
-spec(hash_password/1 :: (password()) -> password_hash()).
-spec(set_admin/1 :: (username()) -> 'ok').
@@ -100,54 +99,27 @@
%%----------------------------------------------------------------------------
-%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
-%% apparently, by OpenAMQ.
-check_login(<<"PLAIN">>, Response) ->
- [User, Pass] = [list_to_binary(T) ||
- T <- string:tokens(binary_to_list(Response), [0])],
- user_pass_login(User, Pass);
-%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
-%% defines this as PLAIN, but in 0-9 that definition is gone, instead
-%% referring generically to "SASL security mechanism", i.e. the above.
-check_login(<<"AMQPLAIN">>, Response) ->
- LoginTable = rabbit_binary_parser:parse_table(Response),
- case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
- lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
- {{value, {_, longstr, User}},
- {value, {_, longstr, Pass}}} ->
- user_pass_login(User, Pass);
- _ ->
- %% Is this an information leak?
- rabbit_misc:protocol_error(
- access_refused,
- "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable])
- end;
-
-check_login(Mechanism, _Response) ->
- rabbit_misc:protocol_error(
- access_refused, "unsupported authentication mechanism '~s'",
- [Mechanism]).
-
user_pass_login(User, Pass) ->
?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
case check_user_pass_login(User, Pass) of
- refused ->
+ {refused, _} ->
rabbit_misc:protocol_error(
access_refused, "login refused for user '~s'", [User]);
{ok, U} ->
U
end.
-check_user_pass_login(User, Pass) ->
- case lookup_user(User) of
- {ok, U} ->
- case check_password(Pass, U#user.password_hash) of
- true -> {ok, U};
- _ -> refused
+check_user_pass_login(Username, Pass) ->
+ Refused = {refused, io_lib:format("user '~s' - invalid credentials",
+ [Username])},
+ case lookup_user(Username) of
+ {ok, User} ->
+ case check_password(Pass, User#user.password_hash) of
+ true -> {ok, User};
+ _ -> Refused
end;
{error, not_found} ->
- refused
+ Refused
end.
internal_lookup_vhost_access(Username, VHostPath) ->
@@ -250,6 +222,9 @@ delete_user(Username) ->
change_password(Username, Password) ->
change_password_hash(Username, hash_password(Password)).
+clear_password(Username) ->
+ change_password_hash(Username, <<"">>).
+
change_password_hash(Username, PasswordHash) ->
R = update_user(Username, fun(User) ->
User#user{ password_hash = PasswordHash }
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 94c05f8b..1e83265f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -152,9 +152,9 @@
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a7468936..6278d5a1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -258,7 +258,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
+ [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -549,12 +549,12 @@ deliver_or_enqueue(Delivery, State) ->
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) ->
{_Guids, BQS1} =
BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
- BQS1
+ {[], BQS1}
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -651,12 +651,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {BQS2, State1} =
- case Fun(BQS) of
- {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)};
- BQS1 -> {BQS1, State}
- end,
- run_message_queue(State1#q{backing_queue_state = BQS2}).
+ {Guids, BQS1} = Fun(BQS),
+ run_message_queue(
+ confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -1141,7 +1138,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
new file mode 100644
index 00000000..1258cb8d
--- /dev/null
+++ b/src/rabbit_auth_mechanism.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description.
+ {description, 0},
+
+ %% Called before authentication starts. Should create a state
+ %% object to be passed through all the stages of authentication.
+ {init, 1},
+
+ %% Handle a stage of authentication. Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {challenge, Challenge, NextState}
+ %% Another round is needed. Here's the state I want next time.
+ %% {protocol_error, Msg, Args}
+ %% Client got the protocol wrong. Log and die.
+ %% {refused, Username}
+ %% Client failed authentication. Log and die.
+ {handle_response, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
new file mode 100644
index 00000000..5d51d904
--- /dev/null
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_amqplain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism amqplain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
+%% defines this as PLAIN, but in 0-9 that definition is gone, instead
+%% referring generically to "SASL security mechanism", i.e. the above.
+
+description() ->
+ [{name, <<"AMQPLAIN">>},
+ {description, <<"QPid AMQPLAIN mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ LoginTable = rabbit_binary_parser:parse_table(Response),
+ case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
+ lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
+ {{value, {_, longstr, User}},
+ {value, {_, longstr, Pass}}} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error,
+ "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
+ [LoginTable]}
+ end.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
new file mode 100644
index 00000000..67665928
--- /dev/null
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_cr_demo).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism cr-demo"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"RABBIT-CR-DEMO">>,
+ ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok)
+%% START-OK: Username
+%% SECURE: "Please tell me your password"
+%% SECURE-OK: "My password is ~s", [Password]
+
+description() ->
+ [{name, <<"RABBIT-CR-DEMO">>},
+ {description, <<"RabbitMQ Demo challenge-response authentication "
+ "mechanism">>}].
+
+init(_Sock) ->
+ #state{}.
+
+handle_response(Response, State = #state{username = undefined}) ->
+ {challenge, <<"Please tell me your password">>,
+ State#state{username = Response}};
+
+handle_response(Response, #state{username = Username}) ->
+ case Response of
+ <<"My password is ", Password/binary>> ->
+ rabbit_access_control:check_user_pass_login(Username, Password);
+ _ ->
+ {protocol_error, "Invalid response '~s'", [Response]}
+ end.
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
new file mode 100644
index 00000000..b21dd313
--- /dev/null
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_external).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism external"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
+%% established by means external to the mechanism". We define that to
+%% mean the peer certificate's subject's CN.
+
+description() ->
+ [{name, <<"EXTERNAL">>},
+ {description, <<"SASL EXTERNAL authentication mechanism">>}].
+
+init(Sock) ->
+ Username = case rabbit_net:peercert(Sock) of
+ {ok, C} ->
+ CN = case rabbit_ssl:peer_cert_subject_item(
+ C, ?'id-at-commonName') of
+ not_found -> {refused, "no CN found"};
+ CN0 -> list_to_binary(CN0)
+ end,
+ case config_sane() of
+ true -> CN;
+ false -> {refused, "configuration unsafe"}
+ end;
+ {error, no_peercert} ->
+ {refused, "no peer certificate"};
+ nossl ->
+ {refused, "not SSL connection"}
+ end,
+ #state{username = Username}.
+
+handle_response(_Response, #state{username = Username}) ->
+ case Username of
+ {refused, _} = E ->
+ E;
+ _ ->
+ case rabbit_access_control:lookup_user(Username) of
+ {ok, User} ->
+ {ok, User};
+ {error, not_found} ->
+ %% This is not an information leak as we have to
+ %% have validated a client cert to get this far.
+ {refused, io_lib:format("user '~s' not found", [Username])}
+ end
+ end.
+
+%%--------------------------------------------------------------------------
+
+config_sane() ->
+ {ok, Opts} = application:get_env(ssl_options),
+ case {proplists:get_value(fail_if_no_peer_cert, Opts),
+ proplists:get_value(verify, Opts)} of
+ {true, verify_peer} ->
+ true;
+ {F, V} ->
+ rabbit_log:warning("EXTERNAL mechanism disabled, "
+ "fail_if_no_peer_cert=~p; "
+ "verify=~p~n", [F, V]),
+ false
+ end.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
new file mode 100644
index 00000000..8758f85f
--- /dev/null
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_plain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism plain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"PLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
+%% apparently, by OpenAMQ.
+
+description() ->
+ [{name, <<"PLAIN">>},
+ {description, <<"SASL PLAIN authentication mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ [User, Pass] = [list_to_binary(T) ||
+ T <- string:tokens(binary_to_list(Response), [0])],
+ rabbit_access_control:check_user_pass_login(User, Pass).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d04944f9..7a728498 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about.
+ %% about. Must return 1 guid per Ack, in the same order as Acks.
{ack, 2},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 668fb9bb..ccadf5af 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 360217a2..df55d961 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -211,6 +211,10 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
+action(clear_password, Node, Args = [Username], _Opts, Inform) ->
+ Inform("Clearing password for user ~p", [Username]),
+ call(Node, {rabbit_access_control, clear_password, Args});
+
action(set_admin, Node, [Username], _Opts, Inform) ->
Inform("Setting administrative status for user ~p", [Username]),
call(Node, {rabbit_access_control, set_admin, [Username]});
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 00e479a2..7414c904 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -150,17 +150,17 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
- case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- case rabbit_exchange_type_registry:lookup_module(T) of
+ case rabbit_registry:lookup_module(exchange, T) of
{error, not_found} -> rabbit_misc:protocol_error(
command_invalid,
"invalid exchange type '~s'", [T]);
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d934a497..d49d0199 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"direct">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 77ca9686..e7f75464 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"fanout">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index ec9e7ba4..caf141fe 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -42,9 +42,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"headers">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-ifdef(use_specs).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d3ecdd4d..44851858 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"topic">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-export([topic_matches/2]).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index fcc93e6f..8ae45abd 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -53,7 +53,7 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(),
+ [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
@@ -132,7 +132,7 @@ start() ->
|| App <- PluginApps],
io:nl(),
- ok = duplicate_node_check(Node),
+ ok = duplicate_node_check(NodeStr),
terminate(0),
ok.
@@ -259,8 +259,9 @@ process_entry(Entry) ->
duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
-duplicate_node_check(Node) ->
- {NodeName, NodeHost} = rabbit_misc:nodeparts(rabbit_misc:makenode(Node)),
+duplicate_node_check(NodeStr) ->
+ Node = rabbit_misc:makenode(NodeStr),
+ {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4dd150a2..15b20bc4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -56,14 +56,15 @@
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun}).
+ channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
+ auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
- peer_cert_validity,
+ peer_cert_validity, auth_mechanism,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -294,7 +295,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
stats_timer =
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun
+ start_heartbeat_fun = StartHeartbeatFun,
+ auth_mechanism = none,
+ auth_state = none
},
handshake, 8))
catch
@@ -681,11 +684,12 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
- Start = #'connection.start'{ version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> },
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = auth_mechanisms_binary(),
+ locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
@@ -733,19 +737,22 @@ handle_method0(MethodName, FieldsBin,
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
- State = #v1{connection_state = starting,
- connection = Connection =
- #connection{protocol = Protocol},
- sock = Sock}) ->
- User = rabbit_access_control:check_login(Mechanism, Response),
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
- heartbeat = 0},
- ok = send_on_channel0(Sock, Tune, Protocol),
- State#v1{connection_state = tuning,
- connection = Connection#connection{
- user = User,
- client_properties = ClientProperties}};
+ State0 = #v1{connection_state = starting,
+ connection = Connection,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism),
+ State = State0#v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock),
+ connection_state = securing,
+ connection =
+ Connection#connection{
+ client_properties = ClientProperties}},
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
@@ -827,6 +834,60 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+auth_mechanism_to_module(TypeBin) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms()),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms() ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ lists:member(Name, Configured)].
+
+auth_mechanisms_binary() ->
+ list_to_binary(
+ string:join(
+ [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+
+auth_phase(Response,
+ State = #v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthState,
+ connection = Connection =
+ #connection{protocol = Protocol},
+ sock = Sock}) ->
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Reason} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s login refused: ~s",
+ [proplists:get_value(name, AuthMechanism:description()), Reason]);
+ {protocol_error, Msg, Args} ->
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{auth_state = AuthState1};
+ {ok, User} ->
+ Tune = #'connection.tune'{channel_max = 0,
+ frame_max = ?FRAME_MAX,
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User}}
+ end.
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -864,6 +925,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) ->
none;
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
Protocol:version();
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
+ none;
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ proplists:get_value(name, Mechanism:description());
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl
index f15275b5..7a3fcb51 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_registry.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_exchange_type_registry).
+-module(rabbit_registry).
-behaviour(gen_server).
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, binary_to_type/1, lookup_module/1]).
+-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -46,11 +46,12 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
--spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_module/2 ::
+ (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]).
-endif.
@@ -61,8 +62,8 @@ start_link() ->
%%---------------------------------------------------------------------------
-register(TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+register(Class, TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
TypeAtom -> TypeAtom
end.
-lookup_module(T) when is_atom(T) ->
- case ets:lookup(?ETS_NAME, T) of
+lookup_module(Class, T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, {Class, T}) of
[{_, Module}] ->
{ok, Module};
[] ->
{error, not_found}
end.
+lookup_all(Class) ->
+ [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
+
%%---------------------------------------------------------------------------
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName)
- when is_binary(TypeName), is_atom(ModuleName) ->
- ok = sanity_check_module(ModuleName),
+internal_register(Class, TypeName, ModuleName)
+ when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(class_module(Class), ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName}),
+ {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
-sanity_check_module(Module) ->
- case catch lists:member(rabbit_exchange_type,
+sanity_check_module(ClassModule, Module) ->
+ case catch lists:member(ClassModule,
lists:flatten(
[Bs || {Attr, Bs} <-
Module:module_info(attributes),
Attr =:= behavior orelse
Attr =:= behaviour])) of
{'EXIT', {undef, _}} -> {error, not_module};
- false -> {error, not_exchange_type};
+ false -> {error, {not_type, ClassModule}};
true -> ok
end.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism.
+
%%---------------------------------------------------------------------------
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName}, _From, State) ->
- ok = internal_register(TypeName, ModuleName),
+handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1d8ce23b..a4da23e2 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -36,6 +36,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+-export([peer_cert_subject_item/2]).
%%--------------------------------------------------------------------------
@@ -45,9 +46,11 @@
-type(certificate() :: binary()).
--spec(peer_cert_issuer/1 :: (certificate()) -> string()).
--spec(peer_cert_subject/1 :: (certificate()) -> string()).
--spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject_item/2 ::
+ (certificate(), tuple()) -> string() | 'not_found').
-endif.
@@ -71,6 +74,14 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
+%% Return a part of the certificate's subject.
+peer_cert_subject_item(Cert, Type) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ find_by_type(Type, Subject)
+ end, Cert).
+
%% Return a string describing the certificate's validity.
peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
@@ -89,6 +100,14 @@ cert_info(F, Cert) ->
DecCert -> DecCert %%R14B onwards
end).
+find_by_type(Type, {rdnSequence, RDNs}) ->
+ case [V || #'AttributeTypeAndValue'{type = T, value = V}
+ <- lists:flatten(RDNs),
+ T == Type] of
+ [{printableString, S}] -> S;
+ [] -> not_found
+ end.
+
%%--------------------------------------------------------------------------
%% Formatting functions
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 17f3572a..09ead22b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -662,7 +662,7 @@ ack(AckTags, State) ->
ack(fun msg_store_remove/3,
fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message{guid = Guid}}, State1) ->
+ (#msg_status{msg = #basic_message { guid = Guid }}, State1) ->
remove_confirms(gb_sets:singleton(Guid), State1)
end,
AckTags, State),
@@ -1100,9 +1100,9 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
+ Self, fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
@@ -1315,10 +1315,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true ->
- {{IsPersistent, Guid, MsgProps}, RAI};
- false ->
- {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, Guid, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1329,8 +1327,8 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
- {[], orddict:new()}, PA),
+ {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1340,18 +1338,17 @@ remove_pending_ack(KeepPersistent,
Guids),
State1
end;
- false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(
- fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
+ false -> IndexState1 =
+ rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
{[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore, AllGuids},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1365,27 +1362,30 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA),
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
- end, {{[], orddict:new()}, State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- AckdGuids = lists:concat(
- orddict:fold(
- fun (IsPersistent, Guids, Gs) ->
- MsgStoreFun(MSCState, IsPersistent, Guids),
- [Guids | Gs]
- end, [], GuidsByStore)),
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {AckdGuids, State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ {lists:reverse(AllGuids),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
+
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false }, Acc) ->
- Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
- {cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
+ index_on_disk = false,
+ guid = Guid },
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
+ [Guid | AllGuids]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1405,13 +1405,13 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = gb_sets:difference(UC, GuidSet) }.
msgs_confirmed(GuidSet, State) ->
- {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}.
+ {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
msgs_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
@@ -1421,9 +1421,9 @@ msgs_written_to_disk(QPid, GuidSet) ->
msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =