summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-18 11:59:56 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-18 11:59:56 +0000
commitcfa09e5fd38baa616b01c002e7e9e34bdc437b56 (patch)
tree2bd6b9d81295d0b3c2e00febc6bc5b8eeb96076f
parent83ebec1bcf772c48c786e354a0c03db16d664706 (diff)
parentad5f36fdaee422405631e3d0d8ee1c3e7db3407c (diff)
downloadrabbitmq-server-cfa09e5fd38baa616b01c002e7e9e34bdc437b56.tar.gz
merge from default
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_amqqueue.erl44
-rw-r--r--src/rabbit_auth_mechanism_external.erl107
-rw-r--r--src/rabbit_channel.erl34
-rw-r--r--src/rabbit_tests.erl13
5 files changed, 67 insertions, 153 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 954e289b..3cfba03e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -373,6 +373,14 @@ home_dir() ->
Other -> Other
end.
+config_files() ->
+ case init:get_argument(config) of
+ {ok, Files} -> [filename:absname(
+ filename:rootname(File, ".config") ++ ".config") ||
+ File <- Files];
+ error -> []
+ end.
+
%---------------------------------------------------------------------------
print_banner() ->
@@ -398,14 +406,24 @@ print_banner() ->
Settings = [{"node", node()},
{"app descriptor", app_location()},
{"home dir", home_dir()},
+ {"config file(s)", config_files()},
{"cookie hash", rabbit_misc:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
{"erlang version", erlang:system_info(version)}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
- Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
- lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
+ Format = fun (K, V) ->
+ io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ [K, V])
+ end,
+ lists:foreach(fun ({"config file(s)" = K, []}) ->
+ Format(K, "(none)");
+ ({"config file(s)" = K, [V0 | Vs]}) ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ ({K, V}) ->
+ Format(K, V)
+ end, Settings),
io:nl().
ensure_working_log_handlers() ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 20097a7d..9626e126 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -212,30 +212,23 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q1 -> Q1
end.
-internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
+internal_declare(Q, true) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
+internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
- case Recover of
- true ->
- ok = store_queue(Q),
- rabbit_misc:const(Q);
- false ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- case mnesia:read({rabbit_durable_queue,
- QueueName}) of
- [] -> ok = store_queue(Q),
- B = add_default_binding(Q),
- fun (Tx) ->
- B(Tx),
- Q
- end;
- [_] -> %% Q exists on stopped node
- rabbit_misc:const(not_found)
- end;
- [ExistingQ] ->
- rabbit_misc:const(ExistingQ)
- end
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case mnesia:read({rabbit_durable_queue, QueueName}) of
+ [] -> ok = store_queue(Q),
+ B = add_default_binding(Q),
+ fun (Tx) -> B(Tx), Q end;
+ [_] -> %% Q exists on stopped node
+ rabbit_misc:const(not_found)
+ end;
+ [ExistingQ] ->
+ rabbit_misc:const(ExistingQ)
end
end).
@@ -494,10 +487,9 @@ on_node_down(Node) ->
end,
fun (Deletions, Tx) ->
rabbit_binding:process_deletions(
- lists:foldl(
- fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- Deletions),
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ Deletions),
Tx)
end).
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
deleted file mode 100644
index 1c4e5c15..00000000
--- a/src/rabbit_auth_mechanism_external.erl
+++ /dev/null
@@ -1,107 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_auth_mechanism_external).
--include("rabbit.hrl").
-
--behaviour(rabbit_auth_mechanism).
-
--export([description/0, init/1, handle_response/2]).
-
--include("rabbit_auth_mechanism_spec.hrl").
-
--include_lib("public_key/include/public_key.hrl").
-
--rabbit_boot_step({?MODULE,
- [{description, "auth mechanism external"},
- {mfa, {rabbit_registry, register,
- [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, kernel_ready}]}).
-
--record(state, {username = undefined}).
-
-%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials
-%% established by means external to the mechanism". We define that to
-%% mean the peer certificate's subject's CN.
-
-description() ->
- [{name, <<"EXTERNAL">>},
- {description, <<"SASL EXTERNAL authentication mechanism">>}].
-
-init(Sock) ->
- Username = case rabbit_net:peercert(Sock) of
- {ok, C} ->
- CN = case rabbit_ssl:peer_cert_subject_item(
- C, ?'id-at-commonName') of
- not_found -> {refused, "no CN found", []};
- CN0 -> list_to_binary(CN0)
- end,
- case config_sane() of
- true -> CN;
- false -> {refused, "configuration unsafe", []}
- end;
- {error, no_peercert} ->
- {refused, "no peer certificate", []};
- nossl ->
- {refused, "not SSL connection", []}
- end,
- #state{username = Username}.
-
-handle_response(_Response, #state{username = Username}) ->
- case Username of
- {refused, _, _} = E ->
- E;
- _ ->
- case rabbit_access_control:check_user_login(Username, []) of
- {ok, User} ->
- {ok, User};
- {error, not_found} ->
- %% This is not an information leak as we have to
- %% have validated a client cert to get this far.
- {refused, "user '~s' not found", [Username]}
- end
- end.
-
-%%--------------------------------------------------------------------------
-
-config_sane() ->
- {ok, Opts} = application:get_env(ssl_options),
- case {proplists:get_value(fail_if_no_peer_cert, Opts),
- proplists:get_value(verify, Opts)} of
- {true, verify_peer} ->
- true;
- {F, V} ->
- rabbit_log:warning("EXTERNAL mechanism disabled, "
- "fail_if_no_peer_cert=~p; "
- "verify=~p~n", [F, V]),
- false
- end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 90fe230d..78ecb774 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -287,8 +287,10 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
handle_cast({confirm, MsgSeqNos, From},
State= #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
- fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)};
- _ -> {noreply, nogroup_confirm(MsgSeqNos, From, State)}
+ fine ->
+ {noreply, group_and_confirm(MsgSeqNos, From, State), hibernate};
+ _ ->
+ {noreply, nogroup_confirm(MsgSeqNos, From, State), hibernate}
end.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
@@ -483,8 +485,7 @@ remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) ->
group_and_confirm([], _QPid, State) ->
State;
group_and_confirm(MsgSeqNos, QPid, State) ->
- {EMs, UC1} =
- take_from_unconfirmed(MsgSeqNos, QPid, State),
+ {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State),
confirm_grouped(EMs, State#ch{unconfirmed=UC1}).
confirm_grouped(EMs, State) ->
@@ -592,6 +593,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State2)
end};
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple,
+ requeue = Requeue},
+ _, State) ->
+ reject(DeliveryTag, Requeue, Multiple, State);
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -777,14 +784,8 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
- _, State = #ch{unacked_message_q = UAMQ}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}};
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1102,6 +1103,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
+reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}}.
+
ack_record(DeliveryTag, ConsumerTag,
_MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d913092c..1709ef3c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2092,12 +2092,13 @@ test_queue_recover() ->
TxID = rabbit_guid:guid(),
{new, #amqqueue { pid = QPid, name = QName }} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
- Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
- sender = self(), message = Msg},
- [true = rabbit_amqqueue:deliver(QPid, Delivery) ||
- _ <- lists:seq(1, Count)],
+ [begin
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
+ Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
+ sender = self(), message = Msg},
+ true = rabbit_amqqueue:deliver(QPid, Delivery)
+ end || _ <- lists:seq(1, Count)],
rabbit_amqqueue:commit_all([QPid], TxID, self()),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),