diff options
author | Jerry Kuch <jerryk@vmware.com> | 2010-12-16 10:38:48 -0800 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2010-12-16 10:38:48 -0800 |
commit | 3a57d61a3982663b669b3ce84175e12a03d45a2c (patch) | |
tree | 251646c1d561a73eeb9df78b6016d001406e6fcd | |
parent | 4997583d3eaaaa34f49781cb50ae2a1c5b506383 (diff) | |
parent | a3ffc09c00caee590a1249c4bb809d2af8cc818d (diff) | |
download | rabbitmq-server-3a57d61a3982663b669b3ce84175e12a03d45a2c.tar.gz |
Large-ish merge from default.
32 files changed, 805 insertions, 274 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 392a479a..2419a54b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -466,6 +466,25 @@ </varlistentry> <varlistentry> + <term><cmdsynopsis><command>clear_password</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose password is to be cleared.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_password tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to clear the + password for the user named + <command>tonyg</command>. This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured). + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> @@ -1009,6 +1028,10 @@ <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem> </varlistentry> <varlistentry> + <term>auth_mechanism</term> + <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem> + </varlistentry> + <varlistentry> <term>user</term> <listitem><para>Username associated with the connection.</para></listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 6c33ef8b..3888f198 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -30,4 +30,5 @@ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {cluster_nodes, []}, {server_properties, []}, - {collect_statistics, none}]}]}. + {collect_statistics, none}, + {auth_mechanisms, ['PLAIN', 'AMQPLAIN']} ]} ]}. diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl new file mode 100644 index 00000000..f8dc93fe --- /dev/null +++ b/include/rabbit_auth_mechanism_spec.hrl @@ -0,0 +1,41 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). +-spec(init/1 :: (rabbit_net:socket()) -> any()). +-spec(handle_response/2 :: (binary(), any()) -> + {'ok', rabbit_types:user()} | + {'challenge', binary(), any()} | + {'protocol_error', string(), [any()]} | + {'refused', string(), [any()]}). + +-endif. diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 36734874..8cb470d0 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -48,6 +48,8 @@ done SCRIPT_DIR=`dirname $SCRIPT_PATH` RABBITMQ_HOME="${SCRIPT_DIR}/.." +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname` +NODENAME=rabbit@${HOSTNAME%%.*} # Load configuration from the rabbitmq.conf file [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 59050692..33883702 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,8 +29,7 @@ ## ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} + SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 66ce4384..4155b31d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,8 +30,6 @@ ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" @@ -92,6 +90,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then -noinput \ -hidden \ -s rabbit_prelaunch \ + -sname rabbitmqprelaunch$$ \ -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}" then RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 872c87e3..52a250c6 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -118,6 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
+-sname rabbitmqprelaunch%RANDOM% ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 76ce25fd..56cff891 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,9 +30,6 @@ ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} - . `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/src/rabbit.erl b/src/rabbit.erl index ace8f286..2ebfdecf 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -69,10 +69,10 @@ -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). --rabbit_boot_step({rabbit_exchange_type_registry, - [{description, "exchange type registry"}, +-rabbit_boot_step({rabbit_registry, + [{description, "plugin registry"}, {mfa, {rabbit_sup, start_child, - [rabbit_exchange_type_registry]}}, + [rabbit_registry]}}, {requires, external_infrastructure}, {enables, kernel_ready}]}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 1826347d..51adbac8 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -33,10 +33,10 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([check_login/2, user_pass_login/2, check_user_pass_login/2, +-export([user_pass_login/2, check_user_pass_login/2, make_salt/0, check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, set_admin/1, - clear_admin/1, list_users/0, lookup_user/1]). + clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). -export([change_password_hash/2, hash_password/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, clear_permissions/2, @@ -54,15 +54,13 @@ -type(password() :: binary()). -type(password_hash() :: binary()). -type(regexp() :: binary()). --spec(check_login/2 :: - (binary(), binary()) -> rabbit_types:user() | - rabbit_types:channel_exit()). -spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_user_pass_login/2 :: (username(), password()) - -> {'ok', rabbit_types:user()} | 'refused'). + -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). +-spec(make_salt/0 :: () -> binary()). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). @@ -72,6 +70,7 @@ -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(clear_password/1 :: (username()) -> 'ok'). -spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). -spec(hash_password/1 :: (password()) -> password_hash()). -spec(set_admin/1 :: (username()) -> 'ok'). @@ -100,54 +99,26 @@ %%---------------------------------------------------------------------------- -%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, -%% apparently, by OpenAMQ. -check_login(<<"PLAIN">>, Response) -> - [User, Pass] = [list_to_binary(T) || - T <- string:tokens(binary_to_list(Response), [0])], - user_pass_login(User, Pass); -%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually -%% defines this as PLAIN, but in 0-9 that definition is gone, instead -%% referring generically to "SASL security mechanism", i.e. the above. -check_login(<<"AMQPLAIN">>, Response) -> - LoginTable = rabbit_binary_parser:parse_table(Response), - case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), - lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of - {{value, {_, longstr, User}}, - {value, {_, longstr, Pass}}} -> - user_pass_login(User, Pass); - _ -> - %% Is this an information leak? - rabbit_misc:protocol_error( - access_refused, - "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field", - [LoginTable]) - end; - -check_login(Mechanism, _Response) -> - rabbit_misc:protocol_error( - access_refused, "unsupported authentication mechanism '~s'", - [Mechanism]). - user_pass_login(User, Pass) -> ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), case check_user_pass_login(User, Pass) of - refused -> + {refused, Msg, Args} -> rabbit_misc:protocol_error( - access_refused, "login refused for user '~s'", [User]); + access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]); {ok, U} -> U end. -check_user_pass_login(User, Pass) -> - case lookup_user(User) of - {ok, U} -> - case check_password(Pass, U#user.password_hash) of - true -> {ok, U}; - _ -> refused +check_user_pass_login(Username, Pass) -> + Refused = {refused, "user '~s' - invalid credentials", [Username]}, + case lookup_user(Username) of + {ok, User} -> + case check_password(Pass, User#user.password_hash) of + true -> {ok, User}; + _ -> Refused end; {error, not_found} -> - refused + Refused end. internal_lookup_vhost_access(Username, VHostPath) -> @@ -250,6 +221,9 @@ delete_user(Username) -> change_password(Username, Password) -> change_password_hash(Username, hash_password(Password)). +clear_password(Username) -> + change_password_hash(Username, <<"">>). + change_password_hash(Username, PasswordHash) -> R = update_user(Username, fun(User) -> User#user{ password_hash = PasswordHash } diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 775c631d..71fd7a17 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -152,9 +152,9 @@ (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: - (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78bb6835..523f7c5e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -199,6 +199,8 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + [emit_consumer_deleted(Ch, CTag) + || {CTag, Ch, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -226,7 +228,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]), + [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -520,7 +522,7 @@ deliver_or_enqueue(Delivery, State) -> requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) + {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -536,12 +538,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). + {Kept, Removed} = split_by_channel(ChPid, Queue), + [emit_consumer_deleted(Ch, CTag) || + {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], + Kept. move_consumers(ChPid, From, To) -> + {Kept, Removed} = split_by_channel(ChPid, From), + {Kept, queue:join(To, Removed)}. + +split_by_channel(ChPid, Queue) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + queue:to_list(Queue)), + {queue:from_list(Kept), queue:from_list(Removed)}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -617,12 +626,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {BQS2, State1} = - case Fun(BQS) of - {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)}; - BQS1 -> {BQS1, State} - end, - run_message_queue(State1#q{backing_queue_state = BQS2}). + {Guids, BQS1} = Fun(BQS), + run_message_queue( + confirm_messages(Guids, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -724,12 +730,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +consumers(#q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). + emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, self()}]). + +emit_consumer_deleted(ChPid, ConsumerTag) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, ChPid}, + {queue, self()}]). + %--------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> @@ -792,14 +820,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, - State = #q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - reply(rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); +handle_call(consumers, _From, State) -> + reply(consumers(State), State); handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, _From, State) -> @@ -902,6 +924,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck), reply(ok, State2) end; @@ -920,6 +944,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C1#cr{limiter_pid = undefined}; _ -> C1 end), + emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -1107,7 +1132,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:idle_timeout(BQS) end, State)); + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl new file mode 100644 index 00000000..ce1b16ac --- /dev/null +++ b/src/rabbit_auth_mechanism.erl @@ -0,0 +1,57 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% A description. + {description, 0}, + + %% Called before authentication starts. Should create a state + %% object to be passed through all the stages of authentication. + {init, 1}, + + %% Handle a stage of authentication. Possible responses: + %% {ok, User} + %% Authentication succeeded, and here's the user record. + %% {challenge, Challenge, NextState} + %% Another round is needed. Here's the state I want next time. + %% {protocol_error, Msg, Args} + %% Client got the protocol wrong. Log and die. + %% {refused, Msg, Args} + %% Client failed authentication. Log and die. + {handle_response, 2} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl new file mode 100644 index 00000000..5d51d904 --- /dev/null +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -0,0 +1,70 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_amqplain). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism amqplain"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually +%% defines this as PLAIN, but in 0-9 that definition is gone, instead +%% referring generically to "SASL security mechanism", i.e. the above. + +description() -> + [{name, <<"AMQPLAIN">>}, + {description, <<"QPid AMQPLAIN mechanism">>}]. + +init(_Sock) -> + []. + +handle_response(Response, _State) -> + LoginTable = rabbit_binary_parser:parse_table(Response), + case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), + lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of + {{value, {_, longstr, User}}, + {value, {_, longstr, Pass}}} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, + "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field", + [LoginTable]} + end. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl new file mode 100644 index 00000000..67665928 --- /dev/null +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -0,0 +1,74 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_cr_demo). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism cr-demo"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"RABBIT-CR-DEMO">>, + ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-record(state, {username = undefined}). + +%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok) +%% START-OK: Username +%% SECURE: "Please tell me your password" +%% SECURE-OK: "My password is ~s", [Password] + +description() -> + [{name, <<"RABBIT-CR-DEMO">>}, + {description, <<"RabbitMQ Demo challenge-response authentication " + "mechanism">>}]. + +init(_Sock) -> + #state{}. + +handle_response(Response, State = #state{username = undefined}) -> + {challenge, <<"Please tell me your password">>, + State#state{username = Response}}; + +handle_response(Response, #state{username = Username}) -> + case Response of + <<"My password is ", Password/binary>> -> + rabbit_access_control:check_user_pass_login(Username, Password); + _ -> + {protocol_error, "Invalid response '~s'", [Response]} + end. diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl new file mode 100644 index 00000000..6572f786 --- /dev/null +++ b/src/rabbit_auth_mechanism_external.erl @@ -0,0 +1,107 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_external). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-include_lib("public_key/include/public_key.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism external"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-record(state, {username = undefined}). + +%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials +%% established by means external to the mechanism". We define that to +%% mean the peer certificate's subject's CN. + +description() -> + [{name, <<"EXTERNAL">>}, + {description, <<"SASL EXTERNAL authentication mechanism">>}]. + +init(Sock) -> + Username = case rabbit_net:peercert(Sock) of + {ok, C} -> + CN = case rabbit_ssl:peer_cert_subject_item( + C, ?'id-at-commonName') of + not_found -> {refused, "no CN found", []}; + CN0 -> list_to_binary(CN0) + end, + case config_sane() of + true -> CN; + false -> {refused, "configuration unsafe", []} + end; + {error, no_peercert} -> + {refused, "no peer certificate", []}; + nossl -> + {refused, "not SSL connection", []} + end, + #state{username = Username}. + +handle_response(_Response, #state{username = Username}) -> + case Username of + {refused, _, _} = E -> + E; + _ -> + case rabbit_access_control:lookup_user(Username) of + {ok, User} -> + {ok, User}; + {error, not_found} -> + %% This is not an information leak as we have to + %% have validated a client cert to get this far. + {refused, "user '~s' not found", [Username]} + end + end. + +%%-------------------------------------------------------------------------- + +config_sane() -> + {ok, Opts} = application:get_env(ssl_options), + case {proplists:get_value(fail_if_no_peer_cert, Opts), + proplists:get_value(verify, Opts)} of + {true, verify_peer} -> + true; + {F, V} -> + rabbit_log:warning("EXTERNAL mechanism disabled, " + "fail_if_no_peer_cert=~p; " + "verify=~p~n", [F, V]), + false + end. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl new file mode 100644 index 00000000..e5f8f3e6 --- /dev/null +++ b/src/rabbit_auth_mechanism_plain.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_plain). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism plain"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"PLAIN">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, +%% apparently, by OpenAMQ. + +description() -> + [{name, <<"PLAIN">>}, + {description, <<"SASL PLAIN authentication mechanism">>}]. + +init(_Sock) -> + []. + +handle_response(Response, _State) -> + %% The '%%"' at the end of the next line is for Emacs + case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" + [{capture, all_but_first, binary}]) of + {match, [User, Pass]} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, "response ~p invalid", [Response]} + end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 352e76fd..8603d8d7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. + %% about. Must return 1 guid per Ack, in the same order as Acks. {ack, 2}, %% A publish, but in the context of a transaction. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 668fb9bb..ccadf5af 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -305,7 +305,7 @@ table_for_resource(#resource{kind = queue}) -> rabbit_queue. %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + {ok, Module} = rabbit_registry:lookup_module(exchange, T), Module. contains(Table, MatchHead) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 162a62fa..d6cdf50d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,10 +35,10 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). +-export([emit_stats/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -72,7 +72,7 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). +-define(FLUSH_CONFIRMS_INTERVAL, 1000). %%---------------------------------------------------------------------------- @@ -90,12 +90,15 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). +-spec(flush_confirms/1 :: (pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -103,8 +106,6 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). --spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). -endif. @@ -121,6 +122,9 @@ do(Pid, Method) -> do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). +flush(Pid) -> + gen_server2:call(Pid, flush). + shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -133,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). + +flush_confirms(Pid) -> + gen_server2:cast(Pid, flush_confirms). + list() -> pg_local:get_members(rabbit_channels). @@ -156,15 +166,6 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). -flush(Pid) -> - gen_server2:call(Pid, flush). - -flush_multiple_acks(Pid) -> - gen_server2:cast(Pid, flush_multiple_acks). - -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -215,6 +216,9 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -224,9 +228,6 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(flush, _From, State) -> - reply(ok, State); - handle_call(_Request, _From, State) -> noreply(State). @@ -261,10 +262,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, - Msg = {_QName, QPid, _MsgId, Redelivered, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}}}, + Msg = {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, @@ -291,11 +292,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_multiple_acks, State) -> - {noreply, flush_multiple(State)}; +handle_cast(flush_confirms, State) -> + {noreply, internal_flush_confirms(State)}; handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. + {noreply, confirm(MsgSeqNo, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -303,7 +304,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> send_or_enqueue_ack(Msg, QPid, State0); + 0 -> confirm(Msg, QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -313,7 +314,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = flush_multiple(State), + State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( @@ -472,11 +473,11 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, _QPid, State) -> +confirm(undefined, _QPid, State) -> State; -send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( @@ -484,7 +485,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> delivery_tag = MSN}), State1 end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> start_confirm_timer( @@ -1240,12 +1241,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, undefined, State); + confirm(MsgSeqNo, undefined, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, undefined, State); + confirm(MsgSeqNo, undefined, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); + confirm(MsgSeqNo, undefined, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, @@ -1259,6 +1260,45 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +start_confirm_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, + ?MODULE, flush_confirms, [self()]), + State#ch{confirm_tref = TRef}; +start_confirm_timer(State) -> + State. + +stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +internal_flush_confirms(State = #ch{writer_pid = WriterPid, + held_confirms = Cs}) -> + case gb_sets:is_empty(Cs) of + true -> State#ch{confirm_tref = undefined}; + false -> [First | Rest] = gb_sets:to_list(Cs), + {Mult, Inds} = find_consecutive_sequence(First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, multiple = true}), + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Inds), + State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined} + end. + +%% Find longest sequence of consecutive numbers at the beginning. +find_consecutive_sequence(Last, []) -> + {Last, []}; +find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> + find_consecutive_sequence(N, Ns); +find_consecutive_sequence(Last, Ns) -> + {Last, Ns}. + terminate(State) -> stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), @@ -1346,42 +1386,3 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. - -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, - ?MODULE, flush_multiple_acks, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> - State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -flush_multiple(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. - -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 360217a2..df55d961 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -211,6 +211,10 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); +action(clear_password, Node, Args = [Username], _Opts, Inform) -> + Inform("Clearing password for user ~p", [Username]), + call(Node, {rabbit_access_control, clear_password, Args}); + action(set_admin, Node, [Username], _Opts, Inform) -> Inform("Setting administrative status for user ~p", [Username]), call(Node, {rabbit_access_control, set_admin, [Username]}); diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 501af585..a95cf0b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -152,17 +152,17 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + {ok, Module} = rabbit_registry:lookup_module(exchange, T), Module. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> - case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown exchange type '~s'", [TypeBin]); T -> - case rabbit_exchange_type_registry:lookup_module(T) of + case rabbit_registry:lookup_module(exchange, T) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]); diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index d934a497..d49d0199 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"direct">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"direct">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). description() -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 77ca9686..e7f75464 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"fanout">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"fanout">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). description() -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index ec9e7ba4..caf141fe 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -42,9 +42,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"headers">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"headers">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). -ifdef(use_specs). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index d3ecdd4d..44851858 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"topic">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"topic">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). -export([topic_matches/2]). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a62e7a6f..dadfc16e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -388,7 +388,7 @@ init_db(ClusterNodes, Force) -> ensure_version_ok(rabbit_upgrade:read_version()), ensure_schema_ok(); {[], false, _} -> - %% First RAM node in cluster, start from scratch + %% Nothing there at all, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 867ecb12..8ae45abd 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -53,7 +53,7 @@ start() -> io:format("Activating RabbitMQ plugins ...~n"), %% Determine our various directories - [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(), + [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(), RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins @@ -132,7 +132,7 @@ start() -> || App <- PluginApps], io:nl(), - ok = duplicate_node_check(Node), + ok = duplicate_node_check(NodeStr), terminate(0), ok. @@ -259,7 +259,8 @@ process_entry(Entry) -> duplicate_node_check([]) -> %% Ignore running node while installing windows service ok; -duplicate_node_check(Node) -> +duplicate_node_check(NodeStr) -> + Node = rabbit_misc:makenode(NodeStr), {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), case net_adm:names(NodeHost) of {ok, NamePorts} -> @@ -272,7 +273,6 @@ duplicate_node_check(Node) -> terminate(?ERROR_CODE); false -> ok end; - {error, address} -> ok; {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n", [EpmdReason]) end. @@ -283,12 +283,9 @@ terminate(Fmt, Args) -> terminate(Status) -> case os:type() of - {unix, _} -> - halt(Status); - {win32, _} -> - init:stop(Status), - receive - after infinity -> ok - end + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status), + receive + after infinity -> ok + end end. - diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4dd150a2..92a2f4d7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -56,14 +56,15 @@ -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun}). + channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism, + auth_state}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, - peer_cert_validity, + peer_cert_validity, auth_mechanism, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -294,7 +295,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, stats_timer = rabbit_event:init_stats_timer(), channel_sup_sup_pid = ChannelSupSupPid, - start_heartbeat_fun = StartHeartbeatFun + start_heartbeat_fun = StartHeartbeatFun, + auth_mechanism = none, + auth_state = none }, handshake, 8)) catch @@ -681,11 +684,12 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> - Start = #'connection.start'{ version_major = ProtocolMajor, - version_minor = ProtocolMinor, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }, + Start = #'connection.start'{ + version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = auth_mechanisms_binary(), + locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ timeout_sec = ?NORMAL_TIMEOUT, @@ -710,42 +714,45 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - try - handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), - State) - catch exit:Reason -> - CompleteReason = case Reason of - #amqp_error{method = none} -> - Reason#amqp_error{method = MethodName}; - OtherReason -> OtherReason - end, + HandleException = + fun(R) -> case ?IS_RUNNING(State) of - true -> send_exception(State, 0, CompleteReason); + true -> send_exception(State, 0, R); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, - CompleteReason}) + throw({channel0_error, State#v1.connection_state, R}) end + end, + try + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), + State) + catch exit:#amqp_error{method = none} = Reason -> + HandleException(Reason#amqp_error{method = MethodName}); + Type:Reason -> + HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, - State = #v1{connection_state = starting, - connection = Connection = - #connection{protocol = Protocol}, - sock = Sock}) -> - User = rabbit_access_control:check_login(Mechanism, Response), - Tune = #'connection.tune'{channel_max = 0, - frame_max = ?FRAME_MAX, - heartbeat = 0}, - ok = send_on_channel0(Sock, Tune, Protocol), - State#v1{connection_state = tuning, - connection = Connection#connection{ - user = User, - client_properties = ClientProperties}}; + State0 = #v1{connection_state = starting, + connection = Connection, + sock = Sock}) -> + AuthMechanism = auth_mechanism_to_module(Mechanism), + State = State0#v1{auth_mechanism = AuthMechanism, + auth_state = AuthMechanism:init(Sock), + connection_state = securing, + connection = + Connection#connection{ + client_properties = ClientProperties}}, + auth_phase(Response, State); + +handle_method0(#'connection.secure_ok'{response = Response}, + State = #v1{connection_state = securing}) -> + auth_phase(Response, State); + handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, @@ -827,6 +834,61 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). +auth_mechanism_to_module(TypeBin) -> + case rabbit_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown authentication mechanism '~s'", + [TypeBin]); + T -> + case {lists:member(T, auth_mechanisms()), + rabbit_registry:lookup_module(auth_mechanism, T)} of + {true, {ok, Module}} -> + Module; + _ -> + rabbit_misc:protocol_error( + command_invalid, + "invalid authentication mechanism '~s'", [T]) + end + end. + +auth_mechanisms() -> + {ok, Configured} = application:get_env(auth_mechanisms), + [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism), + lists:member(Name, Configured)]. + +auth_mechanisms_binary() -> + list_to_binary( + string:join( + [atom_to_list(A) || A <- auth_mechanisms()], " ")). + +auth_phase(Response, + State = #v1{auth_mechanism = AuthMechanism, + auth_state = AuthState, + connection = Connection = + #connection{protocol = Protocol}, + sock = Sock}) -> + case AuthMechanism:handle_response(Response, AuthState) of + {refused, Msg, Args} -> + rabbit_misc:protocol_error( + access_refused, "~s login refused: ~s", + [proplists:get_value(name, AuthMechanism:description()), + io_lib:format(Msg, Args)]); + {protocol_error, Msg, Args} -> + rabbit_misc:protocol_error(syntax_error, Msg, Args); + {challenge, Challenge, AuthState1} -> + Secure = #'connection.secure'{challenge = Challenge}, + ok = send_on_channel0(Sock, Secure, Protocol), + State#v1{auth_state = AuthState1}; + {ok, User} -> + Tune = #'connection.tune'{channel_max = 0, + frame_max = ?FRAME_MAX, + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), + State#v1{connection_state = tuning, + connection = Connection#connection{user = User}} + end. + %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -864,6 +926,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) -> none; i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> Protocol:version(); +i(auth_mechanism, #v1{auth_mechanism = none}) -> + none; +i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> + proplists:get_value(name, Mechanism:description()); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl index f15275b5..7a3fcb51 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_registry.erl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_exchange_type_registry). +-module(rabbit_registry). -behaviour(gen_server). @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/2, binary_to_type/1, lookup_module/1]). +-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -46,11 +46,12 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(register/3 :: (atom(), binary(), atom()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). --spec(lookup_module/1 :: - (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_module/2 :: + (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]). -endif. @@ -61,8 +62,8 @@ start_link() -> %%--------------------------------------------------------------------------- -register(TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, TypeName, ModuleName}). +register(Class, TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it @@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) -> TypeAtom -> TypeAtom end. -lookup_module(T) when is_atom(T) -> - case ets:lookup(?ETS_NAME, T) of +lookup_module(Class, T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, {Class, T}) of [{_, Module}] -> {ok, Module}; [] -> {error, not_found} end. +lookup_all(Class) -> + [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})]. + %%--------------------------------------------------------------------------- internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> list_to_atom(binary_to_list(TypeBin)). -internal_register(TypeName, ModuleName) - when is_binary(TypeName), is_atom(ModuleName) -> - ok = sanity_check_module(ModuleName), +internal_register(Class, TypeName, ModuleName) + when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(class_module(Class), ModuleName), true = ets:insert(?ETS_NAME, - {internal_binary_to_type(TypeName), ModuleName}), + {{Class, internal_binary_to_type(TypeName)}, ModuleName}), ok. -sanity_check_module(Module) -> - case catch lists:member(rabbit_exchange_type, +sanity_check_module(ClassModule, Module) -> + case catch lists:member(ClassModule, lists:flatten( [Bs || {Attr, Bs} <- Module:module_info(attributes), Attr =:= behavior orelse Attr =:= behaviour])) of {'EXIT', {undef, _}} -> {error, not_module}; - false -> {error, not_exchange_type}; + false -> {error, {not_type, ClassModule}}; true -> ok end. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism. + %%--------------------------------------------------------------------------- init([]) -> ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), {ok, none}. -handle_call({register, TypeName, ModuleName}, _From, State) -> - ok = internal_register(TypeName, ModuleName), +handle_call({register, Class, TypeName, ModuleName}, _From, State) -> + ok = internal_register(Class, TypeName, ModuleName), {reply, ok, State}; + handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 1d8ce23b..a4da23e2 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -36,6 +36,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). +-export([peer_cert_subject_item/2]). %%-------------------------------------------------------------------------- @@ -45,9 +46,11 @@ -type(certificate() :: binary()). --spec(peer_cert_issuer/1 :: (certificate()) -> string()). --spec(peer_cert_subject/1 :: (certificate()) -> string()). --spec(peer_cert_validity/1 :: (certificate()) -> string()). +-spec(peer_cert_issuer/1 :: (certificate()) -> string()). +-spec(peer_cert_subject/1 :: (certificate()) -> string()). +-spec(peer_cert_validity/1 :: (certificate()) -> string()). +-spec(peer_cert_subject_item/2 :: + (certificate(), tuple()) -> string() | 'not_found'). -endif. @@ -71,6 +74,14 @@ peer_cert_subject(Cert) -> format_rdn_sequence(Subject) end, Cert). +%% Return a part of the certificate's subject. +peer_cert_subject_item(Cert, Type) -> + cert_info(fun(#'OTPCertificate' { + tbsCertificate = #'OTPTBSCertificate' { + subject = Subject }}) -> + find_by_type(Type, Subject) + end, Cert). + %% Return a string describing the certificate's validity. peer_cert_validity(Cert) -> cert_info(fun(#'OTPCertificate' { @@ -89,6 +100,14 @@ cert_info(F, Cert) -> DecCert -> DecCert %%R14B onwards end). +find_by_type(Type, {rdnSequence, RDNs}) -> + case [V || #'AttributeTypeAndValue'{type = T, value = V} + <- lists:flatten(RDNs), + T == Type] of + [{printableString, S}] -> S; + [] -> not_found + end. + %%-------------------------------------------------------------------------- %% Formatting functions %%-------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0db51165..565c61e7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -323,7 +323,7 @@ timestamp :: timestamp() }). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), - count :: non_neg_integer (), + count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). -type(sync() :: #sync { acks_persistent :: [[seq_id()]], @@ -658,7 +658,7 @@ ack(AckTags, State) -> ack(fun msg_store_remove/3, fun ({_IsPersistent, Guid, _MsgProps}, State1) -> remove_confirms(gb_sets:singleton(Guid), State1); - (#msg_status{msg = #basic_message{guid = Guid}}, State1) -> + (#msg_status{msg = #basic_message { guid = Guid }}, State1) -> remove_confirms(gb_sets:singleton(Guid), State1) end, AckTags, State), @@ -1096,9 +1096,9 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) + Self, fun (StateN) -> {[], tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN)} end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -1311,10 +1311,8 @@ record_pending_ack(#msg_status { seq_id = SeqId, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> - {{IsPersistent, Guid, MsgProps}, RAI}; - false -> - {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + true -> {{IsPersistent, Guid, MsgProps}, RAI}; + false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), State #vqstate { pending_ack = PA1, @@ -1325,8 +1323,8 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, - {[], orddict:new()}, PA), + {PersistentSeqIds, GuidsByStore, _AllGuids} = + dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of @@ -1336,18 +1334,17 @@ remove_pending_ack(KeepPersistent, Guids), State1 end; - false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold( - fun (IsPersistent, Guids, ok) -> - msg_store_remove(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), + false -> IndexState1 = + rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = msg_store_remove(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], State1 #vqstate { index_state = IndexState1 } end. ack(_MsgStoreFun, _Fun, [], State) -> {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, + {{PersistentSeqIds, GuidsByStore, AllGuids}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1361,27 +1358,30 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA), ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} - end, {{[], orddict:new()}, State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - AckdGuids = lists:concat( - orddict:fold( - fun (IsPersistent, Guids, Gs) -> - MsgStoreFun(MSCState, IsPersistent, Guids), - [Guids | Gs] - end, [], GuidsByStore)), + end, {accumulate_ack_init(), State}, AckTags), + IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = MsgStoreFun(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {AckdGuids, State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + {lists:reverse(AllGuids), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. + +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, Acc) -> - Acc; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> - {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. + index_on_disk = false, + guid = Guid }, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), + [Guid | AllGuids]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1401,13 +1401,13 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = gb_sets:difference(UC, GuidSet) }. msgs_confirmed(GuidSet, State) -> - {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}. + {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. msgs_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + QPid, fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), State #vqstate { msgs_on_disk = @@ -1417,9 +1417,9 @@ msgs_written_to_disk(QPid, GuidSet) -> msg_indices_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + QPid, fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> msgs_confirmed(gb_sets:intersection(GuidSet, MOD), State #vqstate { msg_indices_on_disk = |