summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-15 14:40:23 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-15 14:40:23 +0000
commit0c51466cf98bd314498b321618f5c3feb57564ac (patch)
tree1e2720a585f1bc186117a2b0049904540aec601b
parentf34868c0799a11fc874380a8931ccfd13e915f62 (diff)
parentc402e96e050192962e85c018c06844ed1670903e (diff)
downloadrabbitmq-server-0c51466cf98bd314498b321618f5c3feb57564ac.tar.gz
merging default into bug23631
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_channel.erl99
-rw-r--r--src/rabbit_variable_queue.erl8
3 files changed, 59 insertions, 155 deletions
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
deleted file mode 100644
index 1c4e5c15..00000000
--- a/src/rabbit_auth_mechanism_external.erl
+++ /dev/null
@@ -1,107 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_auth_mechanism_external).
--include("rabbit.hrl").
-
--behaviour(rabbit_auth_mechanism).
-
--export([description/0, init/1, handle_response/2]).
-
--include("rabbit_auth_mechanism_spec.hrl").
-
--include_lib("public_key/include/public_key.hrl").
-
--rabbit_boot_step({?MODULE,
- [{description, "auth mechanism external"},
- {mfa, {rabbit_registry, register,
- [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, kernel_ready}]}).
-
--record(state, {username = undefined}).
-
-%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
-%% established by means external to the mechanism". We define that to
-%% mean the peer certificate's subject's CN.
-
-description() ->
- [{name, <<"EXTERNAL">>},
- {description, <<"SASL EXTERNAL authentication mechanism">>}].
-
-init(Sock) ->
- Username = case rabbit_net:peercert(Sock) of
- {ok, C} ->
- CN = case rabbit_ssl:peer_cert_subject_item(
- C, ?'id-at-commonName') of
- not_found -> {refused, "no CN found", []};
- CN0 -> list_to_binary(CN0)
- end,
- case config_sane() of
- true -> CN;
- false -> {refused, "configuration unsafe", []}
- end;
- {error, no_peercert} ->
- {refused, "no peer certificate", []};
- nossl ->
- {refused, "not SSL connection", []}
- end,
- #state{username = Username}.
-
-handle_response(_Response, #state{username = Username}) ->
- case Username of
- {refused, _, _} = E ->
- E;
- _ ->
- case rabbit_access_control:check_user_login(Username, []) of
- {ok, User} ->
- {ok, User};
- {error, not_found} ->
- %% This is not an information leak as we have to
- %% have validated a client cert to get this far.
- {refused, "user '~s' not found", [Username]}
- end
- end.
-
-%%--------------------------------------------------------------------------
-
-config_sane() ->
- {ok, Opts} = application:get_env(ssl_options),
- case {proplists:get_value(fail_if_no_peer_cert, Opts),
- proplists:get_value(verify, Opts)} of
- {true, verify_peer} ->
- true;
- {F, V} ->
- rabbit_log:warning("EXTERNAL mechanism disabled, "
- "fail_if_no_peer_cert=~p; "
- "verify=~p~n", [F, V]),
- false
- end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7b5f096b..1e909686 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -186,8 +186,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_sets:new(),
- queues_for_msg = dict:new()},
+ unconfirmed = gb_trees:empty()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -287,16 +286,14 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
{noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
- State = #ch{queues_for_msg = QFM}) ->
- State1 = dict:fold(
- fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
- Qs = sets:del_element(QPid, QPids),
- case sets:size(Qs) of
- 0 -> confirm([Msg], QPid, State0);
- _ -> State0#ch{queues_for_msg =
- dict:store(Msg, Qs, QFM0)}
- end
- end, State, QFM),
+ State = #ch{unconfirmed = UC}) ->
+ %% TODO: this does a complete scan and partial rebuild of the
+ %% tree, which is quite efficient. To do better we'd need to
+ %% maintain a secondary mapping, from QPids to MsgSeqNos.
+ {MsgSeqNos, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}),
+ State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}),
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
@@ -471,30 +468,31 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+remove_queue_unconfirmed(none, _QPid, Acc) ->
+ Acc;
+remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+ remove_queue_unconfirmed(gb_trees:next(Next), QPid,
+ remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {DoneMessages, State1} =
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {DoneMessages, UC2} =
lists:foldl(
- fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
- queues_for_msg = QFM0}}) ->
- case gb_sets:is_element(MsgSeqNo, UC0) of
- false -> {DMs, State0};
- true -> Qs1 = sets:del_element(
- QPid, dict:fetch(MsgSeqNo, QFM0)),
- case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | DMs],
- State0#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM0),
- unconfirmed =
- gb_sets:delete(MsgSeqNo, UC0)}};
- _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
- {DMs, State0#ch{queues_for_msg = QFM1}}
- end
+ fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UC0) of
+ none -> Acc;
+ {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
end
- end, {[], State}, MsgSeqNos),
- send_confirms(DoneMessages, State1).
+ end, {[], UC}, MsgSeqNos),
+ send_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+
+remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -556,6 +554,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State2)
end};
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple,
+ requeue = Requeue},
+ _, State) ->
+ reject(DeliveryTag, Requeue, Multiple, State);
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -741,14 +745,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{unacked_message_q = UAMQ}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1066,6 +1064,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}}.
+
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
@@ -1215,10 +1222,10 @@ process_routing_result(routed, [], MsgSeqNo, _, State) ->
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
- #ch{queues_for_msg = QFM, unconfirmed = UC} = State,
+ #ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
- unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
+ UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1232,9 +1239,9 @@ send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
State;
send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
- CutOff = case gb_sets:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
- false -> gb_sets:smallest(UC)
+ false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
case Ms of
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c14d5b8a..9f02b6b7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -710,10 +710,14 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
+ MsgPropsFun1 = fun (MsgProps) ->
+ (MsgPropsFun(MsgProps)) #message_properties {
+ needs_confirming = false }
+ end,
a(reduce_memory_use(
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
State2;
({IsPersistent, Guid, MsgProps}, State1) ->
@@ -721,7 +725,7 @@ requeue(AckTags, MsgPropsFun, State) ->
{{ok, Msg = #basic_message{}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, Guid),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
true, true, State2),
State3
end,