diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-15 14:40:23 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-15 14:40:23 +0000 |
commit | 0c51466cf98bd314498b321618f5c3feb57564ac (patch) | |
tree | 1e2720a585f1bc186117a2b0049904540aec601b | |
parent | f34868c0799a11fc874380a8931ccfd13e915f62 (diff) | |
parent | c402e96e050192962e85c018c06844ed1670903e (diff) | |
download | rabbitmq-server-0c51466cf98bd314498b321618f5c3feb57564ac.tar.gz |
merging default into bug23631
-rw-r--r-- | src/rabbit_auth_mechanism_external.erl | 107 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 99 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
3 files changed, 59 insertions, 155 deletions
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl deleted file mode 100644 index 1c4e5c15..00000000 --- a/src/rabbit_auth_mechanism_external.erl +++ /dev/null @@ -1,107 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_auth_mechanism_external). --include("rabbit.hrl"). - --behaviour(rabbit_auth_mechanism). - --export([description/0, init/1, handle_response/2]). - --include("rabbit_auth_mechanism_spec.hrl"). - --include_lib("public_key/include/public_key.hrl"). - --rabbit_boot_step({?MODULE, - [{description, "auth mechanism external"}, - {mfa, {rabbit_registry, register, - [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, kernel_ready}]}). - --record(state, {username = undefined}). - -%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials -%% established by means external to the mechanism". We define that to -%% mean the peer certificate's subject's CN. - -description() -> - [{name, <<"EXTERNAL">>}, - {description, <<"SASL EXTERNAL authentication mechanism">>}]. - -init(Sock) -> - Username = case rabbit_net:peercert(Sock) of - {ok, C} -> - CN = case rabbit_ssl:peer_cert_subject_item( - C, ?'id-at-commonName') of - not_found -> {refused, "no CN found", []}; - CN0 -> list_to_binary(CN0) - end, - case config_sane() of - true -> CN; - false -> {refused, "configuration unsafe", []} - end; - {error, no_peercert} -> - {refused, "no peer certificate", []}; - nossl -> - {refused, "not SSL connection", []} - end, - #state{username = Username}. - -handle_response(_Response, #state{username = Username}) -> - case Username of - {refused, _, _} = E -> - E; - _ -> - case rabbit_access_control:check_user_login(Username, []) of - {ok, User} -> - {ok, User}; - {error, not_found} -> - %% This is not an information leak as we have to - %% have validated a client cert to get this far. - {refused, "user '~s' not found", [Username]} - end - end. - -%%-------------------------------------------------------------------------- - -config_sane() -> - {ok, Opts} = application:get_env(ssl_options), - case {proplists:get_value(fail_if_no_peer_cert, Opts), - proplists:get_value(verify, Opts)} of - {true, verify_peer} -> - true; - {F, V} -> - rabbit_log:warning("EXTERNAL mechanism disabled, " - "fail_if_no_peer_cert=~p; " - "verify=~p~n", [F, V]), - false - end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7b5f096b..1e909686 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -186,8 +186,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_sets:new(), - queues_for_msg = dict:new()}, + unconfirmed = gb_trees:empty()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -287,16 +286,14 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{queues_for_msg = QFM}) -> - State1 = dict:fold( - fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> - Qs = sets:del_element(QPid, QPids), - case sets:size(Qs) of - 0 -> confirm([Msg], QPid, State0); - _ -> State0#ch{queues_for_msg = - dict:store(Msg, Qs, QFM0)} - end - end, State, QFM), + State = #ch{unconfirmed = UC}) -> + %% TODO: this does a complete scan and partial rebuild of the + %% tree, which is quite efficient. To do better we'd need to + %% maintain a secondary mapping, from QPids to MsgSeqNos. + {MsgSeqNos, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}), + State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -471,30 +468,31 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +remove_queue_unconfirmed(none, _QPid, Acc) -> + Acc; +remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> + remove_queue_unconfirmed(gb_trees:next(Next), QPid, + remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). + confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {DoneMessages, State1} = +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {DoneMessages, UC2} = lists:foldl( - fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, - queues_for_msg = QFM0}}) -> - case gb_sets:is_element(MsgSeqNo, UC0) of - false -> {DMs, State0}; - true -> Qs1 = sets:del_element( - QPid, dict:fetch(MsgSeqNo, QFM0)), - case sets:size(Qs1) of - 0 -> {[MsgSeqNo | DMs], - State0#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM0), - unconfirmed = - gb_sets:delete(MsgSeqNo, UC0)}}; - _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), - {DMs, State0#ch{queues_for_msg = QFM1}} - end + fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UC0) of + none -> Acc; + {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) end - end, {[], State}, MsgSeqNos), - send_confirms(DoneMessages, State1). + end, {[], UC}, MsgSeqNos), + send_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + +remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)} + end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -556,6 +554,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, _ -> add_tx_participants(DeliveredQPids, State2) end}; +handle_method(#'basic.nack'{delivery_tag = DeliveryTag, + multiple = Multiple, + requeue = Requeue}, + _, State) -> + reject(DeliveryTag, Requeue, Multiple, State); + handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, @@ -741,14 +745,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{unacked_message_q = UAMQ}) -> - {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), - ok = fold_per_queue( - fun (QPid, MsgIds, ok) -> - rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) - end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}; + _, State) -> + reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, @@ -1066,6 +1064,15 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}. + ack_record(DeliveryTag, ConsumerTag, _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> {DeliveryTag, ConsumerTag, {QPid, MsgId}}. @@ -1215,10 +1222,10 @@ process_routing_result(routed, [], MsgSeqNo, _, State) -> process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, State) -> - #ch{queues_for_msg = QFM, unconfirmed = UC} = State, + #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), - unconfirmed = gb_sets:add(MsgSeqNo, UC)}. + UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1232,9 +1239,9 @@ send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> State; send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), - CutOff = case gb_sets:is_empty(UC) of + CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> gb_sets:smallest(UC) + false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c14d5b8a..9f02b6b7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -710,10 +710,14 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> + MsgPropsFun1 = fun (MsgProps) -> + (MsgPropsFun(MsgProps)) #message_properties { + needs_confirming = false } + end, a(reduce_memory_use( ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> - {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), true, false, State1), State2; ({IsPersistent, Guid, MsgProps}, State1) -> @@ -721,7 +725,7 @@ requeue(AckTags, MsgPropsFun, State) -> {{ok, Msg = #basic_message{}}, MSCState1} = msg_store_read(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), + {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps), true, true, State2), State3 end, |