summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2010-12-16 10:38:48 -0800
committerJerry Kuch <jerryk@vmware.com>2010-12-16 10:38:48 -0800
commit3a57d61a3982663b669b3ce84175e12a03d45a2c (patch)
tree251646c1d561a73eeb9df78b6016d001406e6fcd
parent4997583d3eaaaa34f49781cb50ae2a1c5b506383 (diff)
parenta3ffc09c00caee590a1249c4bb809d2af8cc818d (diff)
downloadrabbitmq-server-3a57d61a3982663b669b3ce84175e12a03d45a2c.tar.gz
Large-ish merge from default.
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl41
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-multi3
-rwxr-xr-xscripts/rabbitmq-server3
-rw-r--r--scripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_access_control.erl62
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_auth_mechanism.erl57
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl70
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl74
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_auth_mechanism_plain.erl66
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl147
-rw-r--r--src/rabbit_control.erl4
-rw-r--r--src/rabbit_exchange.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl6
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_prelaunch.erl21
-rw-r--r--src/rabbit_reader.erl132
-rw-r--r--src/rabbit_registry.erl (renamed from src/rabbit_exchange_type_registry.erl)44
-rw-r--r--src/rabbit_ssl.erl25
-rw-r--r--src/rabbit_variable_queue.erl80
32 files changed, 805 insertions, 274 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 392a479a..2419a54b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -466,6 +466,25 @@
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>clear_password</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose password is to be cleared.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_password tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to clear the
+ password for the user named
+ <command>tonyg</command>. This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
@@ -1009,6 +1028,10 @@
<listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>auth_mechanism</term>
+ <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>user</term>
<listitem><para>Username associated with the connection.</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 6c33ef8b..3888f198 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -30,4 +30,5 @@
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
{cluster_nodes, []},
{server_properties, []},
- {collect_statistics, none}]}]}.
+ {collect_statistics, none},
+ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']} ]} ]}.
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
new file mode 100644
index 00000000..f8dc93fe
--- /dev/null
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-ifdef(use_specs).
+
+-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(init/1 :: (rabbit_net:socket()) -> any()).
+-spec(handle_response/2 :: (binary(), any()) ->
+ {'ok', rabbit_types:user()} |
+ {'challenge', binary(), any()} |
+ {'protocol_error', string(), [any()]} |
+ {'refused', string(), [any()]}).
+
+-endif.
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 36734874..8cb470d0 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -48,6 +48,8 @@ done
SCRIPT_DIR=`dirname $SCRIPT_PATH`
RABBITMQ_HOME="${SCRIPT_DIR}/.."
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname`
+NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 59050692..33883702 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,8 +29,7 @@
##
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
+
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 66ce4384..4155b31d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,8 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
@@ -92,6 +90,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
-noinput \
-hidden \
-s rabbit_prelaunch \
+ -sname rabbitmqprelaunch$$ \
-extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
then
RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 872c87e3..52a250c6 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -118,6 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
+-sname rabbitmqprelaunch%RANDOM% ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 76ce25fd..56cff891 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,9 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
-
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ace8f286..2ebfdecf 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -69,10 +69,10 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_exchange_type_registry,
- [{description, "exchange type registry"},
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
{mfa, {rabbit_sup, start_child,
- [rabbit_exchange_type_registry]}},
+ [rabbit_registry]}},
{requires, external_infrastructure},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 1826347d..51adbac8 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -33,10 +33,10 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([check_login/2, user_pass_login/2, check_user_pass_login/2,
+-export([user_pass_login/2, check_user_pass_login/2, make_salt/0,
check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
- clear_admin/1, list_users/0, lookup_user/1]).
+ clear_admin/1, list_users/0, lookup_user/1, clear_password/1]).
-export([change_password_hash/2, hash_password/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, clear_permissions/2,
@@ -54,15 +54,13 @@
-type(password() :: binary()).
-type(password_hash() :: binary()).
-type(regexp() :: binary()).
--spec(check_login/2 ::
- (binary(), binary()) -> rabbit_types:user() |
- rabbit_types:channel_exit()).
-spec(user_pass_login/2 ::
(username(), password())
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
(username(), password())
- -> {'ok', rabbit_types:user()} | 'refused').
+ -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+-spec(make_salt/0 :: () -> binary()).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
@@ -72,6 +70,7 @@
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
+-spec(clear_password/1 :: (username()) -> 'ok').
-spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
-spec(hash_password/1 :: (password()) -> password_hash()).
-spec(set_admin/1 :: (username()) -> 'ok').
@@ -100,54 +99,26 @@
%%----------------------------------------------------------------------------
-%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
-%% apparently, by OpenAMQ.
-check_login(<<"PLAIN">>, Response) ->
- [User, Pass] = [list_to_binary(T) ||
- T <- string:tokens(binary_to_list(Response), [0])],
- user_pass_login(User, Pass);
-%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
-%% defines this as PLAIN, but in 0-9 that definition is gone, instead
-%% referring generically to "SASL security mechanism", i.e. the above.
-check_login(<<"AMQPLAIN">>, Response) ->
- LoginTable = rabbit_binary_parser:parse_table(Response),
- case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
- lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
- {{value, {_, longstr, User}},
- {value, {_, longstr, Pass}}} ->
- user_pass_login(User, Pass);
- _ ->
- %% Is this an information leak?
- rabbit_misc:protocol_error(
- access_refused,
- "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable])
- end;
-
-check_login(Mechanism, _Response) ->
- rabbit_misc:protocol_error(
- access_refused, "unsupported authentication mechanism '~s'",
- [Mechanism]).
-
user_pass_login(User, Pass) ->
?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
case check_user_pass_login(User, Pass) of
- refused ->
+ {refused, Msg, Args} ->
rabbit_misc:protocol_error(
- access_refused, "login refused for user '~s'", [User]);
+ access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]);
{ok, U} ->
U
end.
-check_user_pass_login(User, Pass) ->
- case lookup_user(User) of
- {ok, U} ->
- case check_password(Pass, U#user.password_hash) of
- true -> {ok, U};
- _ -> refused
+check_user_pass_login(Username, Pass) ->
+ Refused = {refused, "user '~s' - invalid credentials", [Username]},
+ case lookup_user(Username) of
+ {ok, User} ->
+ case check_password(Pass, User#user.password_hash) of
+ true -> {ok, User};
+ _ -> Refused
end;
{error, not_found} ->
- refused
+ Refused
end.
internal_lookup_vhost_access(Username, VHostPath) ->
@@ -250,6 +221,9 @@ delete_user(Username) ->
change_password(Username, Password) ->
change_password_hash(Username, hash_password(Password)).
+clear_password(Username) ->
+ change_password_hash(Username, <<"">>).
+
change_password_hash(Username, PasswordHash) ->
R = update_user(Username, fun(User) ->
User#user{ password_hash = PasswordHash }
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 775c631d..71fd7a17 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -152,9 +152,9 @@
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+ (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78bb6835..523f7c5e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -199,6 +199,8 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ [emit_consumer_deleted(Ch, CTag)
+ || {CTag, Ch, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -226,7 +228,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL,
rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
+ [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -520,7 +522,7 @@ deliver_or_enqueue(Delivery, State) ->
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) ->
- BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)
+ {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -536,12 +538,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
+ {Kept, Removed} = split_by_channel(ChPid, Queue),
+ [emit_consumer_deleted(Ch, CTag) ||
+ {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
+ Kept.
move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = split_by_channel(ChPid, From),
+ {Kept, queue:join(To, Removed)}.
+
+split_by_channel(ChPid, Queue) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ queue:to_list(Queue)),
+ {queue:from_list(Kept), queue:from_list(Removed)}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -617,12 +626,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {BQS2, State1} =
- case Fun(BQS) of
- {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)};
- BQS1 -> {BQS1, State}
- end,
- run_message_queue(State1#q{backing_queue_state = BQS2}).
+ {Guids, BQS1} = Fun(BQS),
+ run_message_queue(
+ confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -724,12 +730,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+consumers(#q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, ConsumerTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, self()}]).
+
+emit_consumer_deleted(ChPid, ConsumerTag) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, ChPid},
+ {queue, self()}]).
+
%---------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
@@ -792,14 +820,8 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From,
- State = #q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- reply(rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+handle_call(consumers, _From, State) ->
+ reply(consumers(State), State);
handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
_From, State) ->
@@ -902,6 +924,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck),
reply(ok, State2)
end;
@@ -920,6 +944,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C1#cr{limiter_pid = undefined};
_ -> C1
end),
+ emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -1107,7 +1132,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
new file mode 100644
index 00000000..ce1b16ac
--- /dev/null
+++ b/src/rabbit_auth_mechanism.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% A description.
+ {description, 0},
+
+ %% Called before authentication starts. Should create a state
+ %% object to be passed through all the stages of authentication.
+ {init, 1},
+
+ %% Handle a stage of authentication. Possible responses:
+ %% {ok, User}
+ %% Authentication succeeded, and here's the user record.
+ %% {challenge, Challenge, NextState}
+ %% Another round is needed. Here's the state I want next time.
+ %% {protocol_error, Msg, Args}
+ %% Client got the protocol wrong. Log and die.
+ %% {refused, Msg, Args}
+ %% Client failed authentication. Log and die.
+ {handle_response, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
new file mode 100644
index 00000000..5d51d904
--- /dev/null
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_amqplain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism amqplain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually
+%% defines this as PLAIN, but in 0-9 that definition is gone, instead
+%% referring generically to "SASL security mechanism", i.e. the above.
+
+description() ->
+ [{name, <<"AMQPLAIN">>},
+ {description, <<"QPid AMQPLAIN mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ LoginTable = rabbit_binary_parser:parse_table(Response),
+ case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
+ lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
+ {{value, {_, longstr, User}},
+ {value, {_, longstr, Pass}}} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error,
+ "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
+ [LoginTable]}
+ end.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
new file mode 100644
index 00000000..67665928
--- /dev/null
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_cr_demo).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism cr-demo"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"RABBIT-CR-DEMO">>,
+ ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok)
+%% START-OK: Username
+%% SECURE: "Please tell me your password"
+%% SECURE-OK: "My password is ~s", [Password]
+
+description() ->
+ [{name, <<"RABBIT-CR-DEMO">>},
+ {description, <<"RabbitMQ Demo challenge-response authentication "
+ "mechanism">>}].
+
+init(_Sock) ->
+ #state{}.
+
+handle_response(Response, State = #state{username = undefined}) ->
+ {challenge, <<"Please tell me your password">>,
+ State#state{username = Response}};
+
+handle_response(Response, #state{username = Username}) ->
+ case Response of
+ <<"My password is ", Password/binary>> ->
+ rabbit_access_control:check_user_pass_login(Username, Password);
+ _ ->
+ {protocol_error, "Invalid response '~s'", [Response]}
+ end.
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
new file mode 100644
index 00000000..6572f786
--- /dev/null
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_external).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-include_lib("public_key/include/public_key.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism external"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-record(state, {username = undefined}).
+
+%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
+%% established by means external to the mechanism". We define that to
+%% mean the peer certificate's subject's CN.
+
+description() ->
+ [{name, <<"EXTERNAL">>},
+ {description, <<"SASL EXTERNAL authentication mechanism">>}].
+
+init(Sock) ->
+ Username = case rabbit_net:peercert(Sock) of
+ {ok, C} ->
+ CN = case rabbit_ssl:peer_cert_subject_item(
+ C, ?'id-at-commonName') of
+ not_found -> {refused, "no CN found", []};
+ CN0 -> list_to_binary(CN0)
+ end,
+ case config_sane() of
+ true -> CN;
+ false -> {refused, "configuration unsafe", []}
+ end;
+ {error, no_peercert} ->
+ {refused, "no peer certificate", []};
+ nossl ->
+ {refused, "not SSL connection", []}
+ end,
+ #state{username = Username}.
+
+handle_response(_Response, #state{username = Username}) ->
+ case Username of
+ {refused, _, _} = E ->
+ E;
+ _ ->
+ case rabbit_access_control:lookup_user(Username) of
+ {ok, User} ->
+ {ok, User};
+ {error, not_found} ->
+ %% This is not an information leak as we have to
+ %% have validated a client cert to get this far.
+ {refused, "user '~s' not found", [Username]}
+ end
+ end.
+
+%%--------------------------------------------------------------------------
+
+config_sane() ->
+ {ok, Opts} = application:get_env(ssl_options),
+ case {proplists:get_value(fail_if_no_peer_cert, Opts),
+ proplists:get_value(verify, Opts)} of
+ {true, verify_peer} ->
+ true;
+ {F, V} ->
+ rabbit_log:warning("EXTERNAL mechanism disabled, "
+ "fail_if_no_peer_cert=~p; "
+ "verify=~p~n", [F, V]),
+ false
+ end.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
new file mode 100644
index 00000000..e5f8f3e6
--- /dev/null
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_auth_mechanism_plain).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_auth_mechanism).
+
+-export([description/0, init/1, handle_response/2]).
+
+-include("rabbit_auth_mechanism_spec.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "auth mechanism plain"},
+ {mfa, {rabbit_registry, register,
+ [auth_mechanism, <<"PLAIN">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
+%% apparently, by OpenAMQ.
+
+description() ->
+ [{name, <<"PLAIN">>},
+ {description, <<"SASL PLAIN authentication mechanism">>}].
+
+init(_Sock) ->
+ [].
+
+handle_response(Response, _State) ->
+ %% The '%%"' at the end of the next line is for Emacs
+ case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
+ [{capture, all_but_first, binary}]) of
+ {match, [User, Pass]} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error, "response ~p invalid", [Response]}
+ end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 352e76fd..8603d8d7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about.
+ %% about. Must return 1 guid per Ack, in the same order as Acks.
{ack, 2},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 668fb9bb..ccadf5af 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
contains(Table, MatchHead) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 162a62fa..d6cdf50d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,10 +35,10 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
+-export([emit_stats/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -72,7 +72,7 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000).
+-define(FLUSH_CONFIRMS_INTERVAL, 1000).
%%----------------------------------------------------------------------------
@@ -90,12 +90,15 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
+-spec(flush_confirms/1 :: (pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -103,8 +106,6 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
--spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
-endif.
@@ -121,6 +122,9 @@ do(Pid, Method) ->
do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -133,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+confirm(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
+
+flush_confirms(Pid) ->
+ gen_server2:cast(Pid, flush_confirms).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -156,15 +166,6 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
-flush(Pid) ->
- gen_server2:call(Pid, flush).
-
-flush_multiple_acks(Pid) ->
- gen_server2:cast(Pid, flush_multiple_acks).
-
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
@@ -215,6 +216,9 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -224,9 +228,6 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(flush, _From, State) ->
- reply(ok, State);
-
handle_call(_Request, _From, State) ->
noreply(State).
@@ -261,10 +262,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired,
- Msg = {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}},
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired,
@@ -291,11 +292,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_multiple_acks, State) ->
- {noreply, flush_multiple(State)};
+handle_cast(flush_confirms, State) ->
+ {noreply, internal_flush_confirms(State)};
handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+ {noreply, confirm(MsgSeqNo, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -303,7 +304,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> confirm(Msg, QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -313,7 +314,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = flush_multiple(State),
+ State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
@@ -472,11 +473,11 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
+confirm(undefined, _QPid, State) ->
State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
@@ -484,7 +485,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
do_if_unconfirmed(MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
start_confirm_timer(
@@ -1240,12 +1241,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
+ confirm(MsgSeqNo, undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1259,6 +1260,45 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
+ ?MODULE, flush_confirms, [self()]),
+ State#ch{confirm_tref = TRef};
+start_confirm_timer(State) ->
+ State.
+
+stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ State;
+stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#ch{confirm_tref = undefined}.
+
+internal_flush_confirms(State = #ch{writer_pid = WriterPid,
+ held_confirms = Cs}) ->
+ case gb_sets:is_empty(Cs) of
+ true -> State#ch{confirm_tref = undefined};
+ false -> [First | Rest] = gb_sets:to_list(Cs),
+ {Mult, Inds} = find_consecutive_sequence(First, Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult, multiple = true}),
+ ok = lists:foldl(
+ fun(T, ok) -> rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = T})
+ end, ok, Inds),
+ State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}
+ end.
+
+%% Find longest sequence of consecutive numbers at the beginning.
+find_consecutive_sequence(Last, []) ->
+ {Last, []};
+find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
+ find_consecutive_sequence(N, Ns);
+find_consecutive_sequence(Last, Ns) ->
+ {Last, Ns}.
+
terminate(State) ->
stop_confirm_timer(State),
pg_local:leave(rabbit_channels, self()),
@@ -1346,42 +1386,3 @@ erase_queue_stats(QPid) ->
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
-
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL,
- ?MODULE, flush_multiple_acks, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-flush_multiple(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 360217a2..df55d961 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -211,6 +211,10 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
+action(clear_password, Node, Args = [Username], _Opts, Inform) ->
+ Inform("Clearing password for user ~p", [Username]),
+ call(Node, {rabbit_access_control, clear_password, Args});
+
action(set_admin, Node, [Username], _Opts, Inform) ->
Inform("Setting administrative status for user ~p", [Username]),
call(Node, {rabbit_access_control, set_admin, [Username]});
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 501af585..a95cf0b1 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -152,17 +152,17 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
- case rabbit_exchange_type_registry:binary_to_type(TypeBin) of
+ case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- case rabbit_exchange_type_registry:lookup_module(T) of
+ case rabbit_registry:lookup_module(exchange, T) of
{error, not_found} -> rabbit_misc:protocol_error(
command_invalid,
"invalid exchange type '~s'", [T]);
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d934a497..d49d0199 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"direct">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 77ca9686..e7f75464 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"fanout">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
description() ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index ec9e7ba4..caf141fe 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -42,9 +42,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"headers">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-ifdef(use_specs).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d3ecdd4d..44851858 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -41,9 +41,9 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
- {mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE]}},
- {requires, rabbit_exchange_type_registry},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"topic">>, ?MODULE]}},
+ {requires, rabbit_registry},
{enables, kernel_ready}]}).
-export([topic_matches/2]).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a62e7a6f..dadfc16e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -388,7 +388,7 @@ init_db(ClusterNodes, Force) ->
ensure_version_ok(rabbit_upgrade:read_version()),
ensure_schema_ok();
{[], false, _} ->
- %% First RAM node in cluster, start from scratch
+ %% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 867ecb12..8ae45abd 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -53,7 +53,7 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(),
+ [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
@@ -132,7 +132,7 @@ start() ->
|| App <- PluginApps],
io:nl(),
- ok = duplicate_node_check(Node),
+ ok = duplicate_node_check(NodeStr),
terminate(0),
ok.
@@ -259,7 +259,8 @@ process_entry(Entry) ->
duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
-duplicate_node_check(Node) ->
+duplicate_node_check(NodeStr) ->
+ Node = rabbit_misc:makenode(NodeStr),
{NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
@@ -272,7 +273,6 @@ duplicate_node_check(Node) ->
terminate(?ERROR_CODE);
false -> ok
end;
- {error, address} -> ok;
{error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
[EpmdReason])
end.
@@ -283,12 +283,9 @@ terminate(Fmt, Args) ->
terminate(Status) ->
case os:type() of
- {unix, _} ->
- halt(Status);
- {win32, _} ->
- init:stop(Status),
- receive
- after infinity -> ok
- end
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
end.
-
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4dd150a2..92a2f4d7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -56,14 +56,15 @@
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun}).
+ channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
+ auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
- peer_cert_validity,
+ peer_cert_validity, auth_mechanism,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -294,7 +295,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
stats_timer =
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun
+ start_heartbeat_fun = StartHeartbeatFun,
+ auth_mechanism = none,
+ auth_state = none
},
handshake, 8))
catch
@@ -681,11 +684,12 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
- Start = #'connection.start'{ version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> },
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = auth_mechanisms_binary(),
+ locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
@@ -710,42 +714,45 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- try
- handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
- State)
- catch exit:Reason ->
- CompleteReason = case Reason of
- #amqp_error{method = none} ->
- Reason#amqp_error{method = MethodName};
- OtherReason -> OtherReason
- end,
+ HandleException =
+ fun(R) ->
case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, CompleteReason);
+ true -> send_exception(State, 0, R);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state,
- CompleteReason})
+ throw({channel0_error, State#v1.connection_state, R})
end
+ end,
+ try
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
+ State)
+ catch exit:#amqp_error{method = none} = Reason ->
+ HandleException(Reason#amqp_error{method = MethodName});
+ Type:Reason ->
+ HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
- State = #v1{connection_state = starting,
- connection = Connection =
- #connection{protocol = Protocol},
- sock = Sock}) ->
- User = rabbit_access_control:check_login(Mechanism, Response),
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
- heartbeat = 0},
- ok = send_on_channel0(Sock, Tune, Protocol),
- State#v1{connection_state = tuning,
- connection = Connection#connection{
- user = User,
- client_properties = ClientProperties}};
+ State0 = #v1{connection_state = starting,
+ connection = Connection,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism),
+ State = State0#v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock),
+ connection_state = securing,
+ connection =
+ Connection#connection{
+ client_properties = ClientProperties}},
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
@@ -827,6 +834,61 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+auth_mechanism_to_module(TypeBin) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms()),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms() ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ lists:member(Name, Configured)].
+
+auth_mechanisms_binary() ->
+ list_to_binary(
+ string:join(
+ [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+
+auth_phase(Response,
+ State = #v1{auth_mechanism = AuthMechanism,
+ auth_state = AuthState,
+ connection = Connection =
+ #connection{protocol = Protocol},
+ sock = Sock}) ->
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Msg, Args} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s login refused: ~s",
+ [proplists:get_value(name, AuthMechanism:description()),
+ io_lib:format(Msg, Args)]);
+ {protocol_error, Msg, Args} ->
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{auth_state = AuthState1};
+ {ok, User} ->
+ Tune = #'connection.tune'{channel_max = 0,
+ frame_max = ?FRAME_MAX,
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User}}
+ end.
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -864,6 +926,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) ->
none;
i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
Protocol:version();
+i(auth_mechanism, #v1{auth_mechanism = none}) ->
+ none;
+i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ proplists:get_value(name, Mechanism:description());
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl
index f15275b5..7a3fcb51 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_registry.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_exchange_type_registry).
+-module(rabbit_registry).
-behaviour(gen_server).
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, binary_to_type/1, lookup_module/1]).
+-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -46,11 +46,12 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
--spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_module/2 ::
+ (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]).
-endif.
@@ -61,8 +62,8 @@ start_link() ->
%%---------------------------------------------------------------------------
-register(TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+register(Class, TypeName, ModuleName) ->
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
TypeAtom -> TypeAtom
end.
-lookup_module(T) when is_atom(T) ->
- case ets:lookup(?ETS_NAME, T) of
+lookup_module(Class, T) when is_atom(T) ->
+ case ets:lookup(?ETS_NAME, {Class, T}) of
[{_, Module}] ->
{ok, Module};
[] ->
{error, not_found}
end.
+lookup_all(Class) ->
+ [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})].
+
%%---------------------------------------------------------------------------
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName)
- when is_binary(TypeName), is_atom(ModuleName) ->
- ok = sanity_check_module(ModuleName),
+internal_register(Class, TypeName, ModuleName)
+ when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
+ ok = sanity_check_module(class_module(Class), ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName}),
+ {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
-sanity_check_module(Module) ->
- case catch lists:member(rabbit_exchange_type,
+sanity_check_module(ClassModule, Module) ->
+ case catch lists:member(ClassModule,
lists:flatten(
[Bs || {Attr, Bs} <-
Module:module_info(attributes),
Attr =:= behavior orelse
Attr =:= behaviour])) of
{'EXIT', {undef, _}} -> {error, not_module};
- false -> {error, not_exchange_type};
+ false -> {error, {not_type, ClassModule}};
true -> ok
end.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism.
+
%%---------------------------------------------------------------------------
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName}, _From, State) ->
- ok = internal_register(TypeName, ModuleName),
+handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1d8ce23b..a4da23e2 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -36,6 +36,7 @@
-include_lib("public_key/include/public_key.hrl").
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
+-export([peer_cert_subject_item/2]).
%%--------------------------------------------------------------------------
@@ -45,9 +46,11 @@
-type(certificate() :: binary()).
--spec(peer_cert_issuer/1 :: (certificate()) -> string()).
--spec(peer_cert_subject/1 :: (certificate()) -> string()).
--spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_issuer/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject/1 :: (certificate()) -> string()).
+-spec(peer_cert_validity/1 :: (certificate()) -> string()).
+-spec(peer_cert_subject_item/2 ::
+ (certificate(), tuple()) -> string() | 'not_found').
-endif.
@@ -71,6 +74,14 @@ peer_cert_subject(Cert) ->
format_rdn_sequence(Subject)
end, Cert).
+%% Return a part of the certificate's subject.
+peer_cert_subject_item(Cert, Type) ->
+ cert_info(fun(#'OTPCertificate' {
+ tbsCertificate = #'OTPTBSCertificate' {
+ subject = Subject }}) ->
+ find_by_type(Type, Subject)
+ end, Cert).
+
%% Return a string describing the certificate's validity.
peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
@@ -89,6 +100,14 @@ cert_info(F, Cert) ->
DecCert -> DecCert %%R14B onwards
end).
+find_by_type(Type, {rdnSequence, RDNs}) ->
+ case [V || #'AttributeTypeAndValue'{type = T, value = V}
+ <- lists:flatten(RDNs),
+ T == Type] of
+ [{printableString, S}] -> S;
+ [] -> not_found
+ end.
+
%%--------------------------------------------------------------------------
%% Formatting functions
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0db51165..565c61e7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,7 @@
timestamp :: timestamp() }).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer (),
+ count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
-type(sync() :: #sync { acks_persistent :: [[seq_id()]],
@@ -658,7 +658,7 @@ ack(AckTags, State) ->
ack(fun msg_store_remove/3,
fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message{guid = Guid}}, State1) ->
+ (#msg_status{msg = #basic_message { guid = Guid }}, State1) ->
remove_confirms(gb_sets:singleton(Guid), State1)
end,
AckTags, State),
@@ -1096,9 +1096,9 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
+ Self, fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
@@ -1311,10 +1311,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true ->
- {{IsPersistent, Guid, MsgProps}, RAI};
- false ->
- {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, Guid, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1325,8 +1323,8 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
- {[], orddict:new()}, PA),
+ {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1336,18 +1334,17 @@ remove_pending_ack(KeepPersistent,
Guids),
State1
end;
- false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(
- fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
+ false -> IndexState1 =
+ rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
{[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore, AllGuids},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1361,27 +1358,30 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA),
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
- end, {{[], orddict:new()}, State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- AckdGuids = lists:concat(
- orddict:fold(
- fun (IsPersistent, Guids, Gs) ->
- MsgStoreFun(MSCState, IsPersistent, Guids),
- [Guids | Gs]
- end, [], GuidsByStore)),
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {AckdGuids, State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ {lists:reverse(AllGuids),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
+
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false }, Acc) ->
- Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
- {cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
+ index_on_disk = false,
+ guid = Guid },
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
+ [Guid | AllGuids]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1401,13 +1401,13 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = gb_sets:difference(UC, GuidSet) }.
msgs_confirmed(GuidSet, State) ->
- {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}.
+ {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
msgs_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
@@ -1417,9 +1417,9 @@ msgs_written_to_disk(QPid, GuidSet) ->
msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ QPid, fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =