diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-18 11:59:56 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-18 11:59:56 +0000 |
commit | cfa09e5fd38baa616b01c002e7e9e34bdc437b56 (patch) | |
tree | 2bd6b9d81295d0b3c2e00febc6bc5b8eeb96076f | |
parent | 83ebec1bcf772c48c786e354a0c03db16d664706 (diff) | |
parent | ad5f36fdaee422405631e3d0d8ee1c3e7db3407c (diff) | |
download | rabbitmq-server-cfa09e5fd38baa616b01c002e7e9e34bdc437b56.tar.gz |
merge from default
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 44 | ||||
-rw-r--r-- | src/rabbit_auth_mechanism_external.erl | 107 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 34 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 13 |
5 files changed, 67 insertions, 153 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 954e289b..3cfba03e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -373,6 +373,14 @@ home_dir() -> Other -> Other end. +config_files() -> + case init:get_argument(config) of + {ok, Files} -> [filename:absname( + filename:rootname(File, ".config") ++ ".config") || + File <- Files]; + error -> [] + end. + %--------------------------------------------------------------------------- print_banner() -> @@ -398,14 +406,24 @@ print_banner() -> Settings = [{"node", node()}, {"app descriptor", app_location()}, {"home dir", home_dir()}, + {"config file(s)", config_files()}, {"cookie hash", rabbit_misc:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, {"database dir", rabbit_mnesia:dir()}, {"erlang version", erlang:system_info(version)}], DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), - Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", - lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), + Format = fun (K, V) -> + io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", + [K, V]) + end, + lists:foreach(fun ({"config file(s)" = K, []}) -> + Format(K, "(none)"); + ({"config file(s)" = K, [V0 | Vs]}) -> + Format(K, V0), [Format("", V) || V <- Vs]; + ({K, V}) -> + Format(K, V) + end, Settings), io:nl(). ensure_working_log_handlers() -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 20097a7d..9626e126 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -212,30 +212,23 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q1 -> Q1 end. -internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> +internal_declare(Q, true) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); +internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> - case Recover of - true -> - ok = store_queue(Q), - rabbit_misc:const(Q); - false -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case mnesia:read({rabbit_durable_queue, - QueueName}) of - [] -> ok = store_queue(Q), - B = add_default_binding(Q), - fun (Tx) -> - B(Tx), - Q - end; - [_] -> %% Q exists on stopped node - rabbit_misc:const(not_found) - end; - [ExistingQ] -> - rabbit_misc:const(ExistingQ) - end + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case mnesia:read({rabbit_durable_queue, QueueName}) of + [] -> ok = store_queue(Q), + B = add_default_binding(Q), + fun (Tx) -> B(Tx), Q end; + [_] -> %% Q exists on stopped node + rabbit_misc:const(not_found) + end; + [ExistingQ] -> + rabbit_misc:const(ExistingQ) end end). @@ -494,10 +487,9 @@ on_node_down(Node) -> end, fun (Deletions, Tx) -> rabbit_binding:process_deletions( - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - Deletions), + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + Deletions), Tx) end). diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl deleted file mode 100644 index 1c4e5c15..00000000 --- a/src/rabbit_auth_mechanism_external.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_auth_mechanism_external). --include("rabbit.hrl"). - --behaviour(rabbit_auth_mechanism). - --export([description/0, init/1, handle_response/2]). - --include("rabbit_auth_mechanism_spec.hrl"). - --include_lib("public_key/include/public_key.hrl"). - --rabbit_boot_step({?MODULE, - [{description, "auth mechanism external"}, - {mfa, {rabbit_registry, register, - [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, kernel_ready}]}). - --record(state, {username = undefined}). - -%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials -%% established by means external to the mechanism". We define that to -%% mean the peer certificate's subject's CN. - -description() -> - [{name, <<"EXTERNAL">>}, - {description, <<"SASL EXTERNAL authentication mechanism">>}]. - -init(Sock) -> - Username = case rabbit_net:peercert(Sock) of - {ok, C} -> - CN = case rabbit_ssl:peer_cert_subject_item( - C, ?'id-at-commonName') of - not_found -> {refused, "no CN found", []}; - CN0 -> list_to_binary(CN0) - end, - case config_sane() of - true -> CN; - false -> {refused, "configuration unsafe", []} - end; - {error, no_peercert} -> - {refused, "no peer certificate", []}; - nossl -> - {refused, "not SSL connection", []} - end, - #state{username = Username}. - -handle_response(_Response, #state{username = Username}) -> - case Username of - {refused, _, _} = E -> - E; - _ -> - case rabbit_access_control:check_user_login(Username, []) of - {ok, User} -> - {ok, User}; - {error, not_found} -> - %% This is not an information leak as we have to - %% have validated a client cert to get this far. - {refused, "user '~s' not found", [Username]} - end - end. - -%%-------------------------------------------------------------------------- - -config_sane() -> - {ok, Opts} = application:get_env(ssl_options), - case {proplists:get_value(fail_if_no_peer_cert, Opts), - proplists:get_value(verify, Opts)} of - {true, verify_peer} -> - true; - {F, V} -> - rabbit_log:warning("EXTERNAL mechanism disabled, " - "fail_if_no_peer_cert=~p; " - "verify=~p~n", [F, V]), - false - end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 90fe230d..78ecb774 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -287,8 +287,10 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> handle_cast({confirm, MsgSeqNos, From}, State= #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of - fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)}; - _ -> {noreply, nogroup_confirm(MsgSeqNos, From, State)} + fine -> + {noreply, group_and_confirm(MsgSeqNos, From, State), hibernate}; + _ -> + {noreply, nogroup_confirm(MsgSeqNos, From, State), hibernate} end. handle_info({'DOWN', _MRef, process, QPid, _Reason}, @@ -483,8 +485,7 @@ remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) -> group_and_confirm([], _QPid, State) -> State; group_and_confirm(MsgSeqNos, QPid, State) -> - {EMs, UC1} = - take_from_unconfirmed(MsgSeqNos, QPid, State), + {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State), confirm_grouped(EMs, State#ch{unconfirmed=UC1}). confirm_grouped(EMs, State) -> @@ -592,6 +593,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, _ -> add_tx_participants(DeliveredQPids, State2) end}; +handle_method(#'basic.nack'{delivery_tag = DeliveryTag, + multiple = Multiple, + requeue = Requeue}, + _, State) -> + reject(DeliveryTag, Requeue, Multiple, State); + handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, @@ -777,14 +784,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{unacked_message_q = UAMQ}) -> - {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) - end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}; + _, State) -> + reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1102,6 +1103,15 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}. + ack_record(DeliveryTag, ConsumerTag, _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> {DeliveryTag, ConsumerTag, {QPid, MsgId}}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d913092c..1709ef3c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2092,12 +2092,13 @@ test_queue_recover() -> TxID = rabbit_guid:guid(), {new, #amqqueue { pid = QPid, name = QName }} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, <<>>), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - [true = rabbit_amqqueue:deliver(QPid, Delivery) || - _ <- lists:seq(1, Count)], + [begin + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, <<>>), + Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, + sender = self(), message = Msg}, + true = rabbit_amqqueue:deliver(QPid, Delivery) + end || _ <- lists:seq(1, Count)], rabbit_amqqueue:commit_all([QPid], TxID, self()), exit(QPid, kill), MRef = erlang:monitor(process, QPid), |