summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Alexandru Ionescu <vlad@rabbitmq.com>2011-01-27 13:49:27 +0000
committerVlad Alexandru Ionescu <vlad@rabbitmq.com>2011-01-27 13:49:27 +0000
commit2d286f26295a4a11141b572d3bb9183b9dc3db3e (patch)
tree626ae04e659009570f5c853cdfb5a653a774303d
parent37692ccb4afc9545807ff438ebae8d3d83602e40 (diff)
parentba418f0f7c06821af7ce7d3719f35895830c1acd (diff)
downloadrabbitmq-server-2d286f26295a4a11141b572d3bb9183b9dc3db3e.tar.gz
merging in from default
-rw-r--r--docs/rabbitmq-server.1.xml5
-rw-r--r--docs/rabbitmq-service.xml5
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--include/rabbit_backing_queue_spec.hrl15
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-server.bat2
-rw-r--r--scripts/rabbitmq-service.bat2
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl54
-rw-r--r--src/rabbit_auth_mechanism_plain.erl35
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl78
-rw-r--r--src/rabbit_control.erl25
-rw-r--r--src/rabbit_misc.erl17
-rw-r--r--src/rabbit_msg_store.erl103
-rw-r--r--src/rabbit_multi.erl2
-rw-r--r--src/rabbit_networking.erl226
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_variable_queue.erl19
-rw-r--r--src/supervisor2.erl6
-rw-r--r--src/tcp_acceptor.erl4
-rw-r--r--src/tcp_listener.erl9
-rw-r--r--src/test_sup.erl14
24 files changed, 445 insertions, 220 deletions
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index 687a9c39..f161a291 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -83,8 +83,9 @@ machine guide</ulink> for details.
<term>RABBITMQ_NODE_IP_ADDRESS</term>
<listitem>
<para>
-Defaults to 0.0.0.0. This can be changed if you only want to bind to
-one network interface.
+By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
+available. Set this if you only want to bind to one network interface
+or address family.
</para>
</listitem>
</varlistentry>
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index e95f9889..3368960b 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -165,8 +165,9 @@ machine guide</ulink> for details.
<term>RABBITMQ_NODE_IP_ADDRESS</term>
<listitem>
<para>
-Defaults to 0.0.0.0. This can be changed if you only want to bind to
-one network interface.
+By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
+available. Set this if you only want to bind to one network interface
+or address family.
</para>
</listitem>
</varlistentry>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 2152cab3..bd9fee7d 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1191,6 +1191,16 @@
messages to the channel's consumers.
</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>confirm</term>
+ <listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unconfirmed</term>
+ <listitem><para>Number of published messages not yet
+ confirmed. On channels not in confirm mode, this
+ remains 0.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index bb0fbcd3..cc7221d6 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -14,7 +14,7 @@
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
- {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
+ {env, [{tcp_listeners, [5672]},
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 296bfdb3..accb2c0e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -14,14 +14,13 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--type(fetch_result() ::
+-type(fetch_result(Ack) ::
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
+ {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(ack_required() :: boolean()).
-type(confirm_required() :: boolean()).
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
@@ -36,13 +35,17 @@
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/3 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
--spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(),
+-spec(publish_delivered/4 :: (true, rabbit_types:basic_message(),
rabbit_types:message_properties(), state())
- -> {ack(), state()}).
+ -> {ack(), state()};
+ (false, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), state())
+ -> {undefined, state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
--spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
+-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
+ (false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 447df510..5c390a51 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -25,7 +25,7 @@ SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
-DEFAULT_NODE_IP_ADDRESS=0.0.0.0
+DEFAULT_NODE_IP_ADDRESS=auto
DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 2467f2b5..0cfa5ea8 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -37,7 +37,7 @@ if "!RABBITMQ_NODENAME!"=="" (
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ set RABBITMQ_NODE_IP_ADDRESS=auto
)
) else (
if "!RABBITMQ_NODE_PORT!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 49b6b9ce..43520b55 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -43,7 +43,7 @@ if "!RABBITMQ_NODENAME!"=="" (
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ set RABBITMQ_NODE_IP_ADDRESS=auto
)
) else (
if "!RABBITMQ_NODE_PORT!"=="" (
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ad9e3ce6..a6da551d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -137,7 +137,9 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit()).
+ rabbit_types:connection_exit() |
+ fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
@@ -215,8 +217,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[_] -> %% Q exists on stopped node
rabbit_misc:const(not_found)
end;
- [ExistingQ] ->
- rabbit_misc:const(ExistingQ)
+ [ExistingQ = #amqqueue{pid = QPid}] ->
+ case is_process_alive(QPid) of
+ true -> rabbit_misc:const(ExistingQ);
+ false -> TailFun = internal_delete(QueueName),
+ fun (Tx) -> TailFun(Tx), ExistingQ end
+ end
end
end).
@@ -432,17 +438,15 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] -> internal_delete1(QueueName)
+ [] -> rabbit_misc:const({error, not_found});
+ [_] -> Deletions = internal_delete1(QueueName),
+ fun (Tx) -> ok = rabbit_binding:process_deletions(
+ Deletions, Tx)
+ end
end
- end,
- fun ({error, _} = Err, _Tx) ->
- Err;
- (Deletions, Tx) ->
- ok = rabbit_binding:process_deletions(Deletions, Tx)
end).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 663977ba..3418c663 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-record_current_channel_tx(ChPid, Txn) ->
- %% as a side effect this also starts monitoring the channel (if
- %% that wasn't happening already)
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
@@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn,
{NeedsConfirming,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}}) ->
- record_current_channel_tx(ChPid, Txn),
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
{true,
NeedsConfirming,
State#q{backing_queue_state =
@@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
true -> {stop, State1};
false -> State2 = case Txn of
none -> State1;
- _ -> rollback_transaction(Txn, ChPid,
+ _ -> rollback_transaction(Txn, C,
State1)
end,
{ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -627,26 +622,23 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(
confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL),
- BQS),
- %% ChPid must be known here because of the participant management
- %% by the channel.
- C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
+ {AckTags, BQS1} = BQ:tx_commit(
+ Txn, fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL), BQS),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
-rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
%% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here (would also require ChPid)
- record_current_channel_tx(ChPid, none),
+ %% we would add them back in here.
+ maybe_store_ch_record(C#cr{txn = none}),
State#q{backing_queue_state = BQS1}.
subtract_acks(A, B) when is_list(B) ->
@@ -848,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) ->
noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
- NewState = commit_transaction(Txn, From, ChPid, State),
- noreply(run_message_queue(NewState));
+ case lookup_ch(ChPid) of
+ not_found -> reply(ok, State);
+ C -> noreply(run_message_queue(
+ commit_transaction(Txn, From, C, State)))
+ end;
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1048,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid},
end;
handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(rollback_transaction(Txn, ChPid, State));
+ noreply(case lookup_ch(ChPid) of
+ not_found -> State;
+ C -> rollback_transaction(Txn, C, State)
+ end);
handle_cast(delete_immediately, State) ->
{stop, normal, State};
@@ -1151,15 +1149,15 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- BQS1 = BQ:handle_pre_hibernate(BQS),
- %% no activity for a while == 0 egress and ingress rates
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS2},
+ backing_queue_state = BQS3},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 7d9dcd20..1ca07018 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -33,6 +33,10 @@
%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
%% apparently, by OpenAMQ.
+%% TODO: once the minimum erlang becomes R13B03, reimplement this
+%% using the binary module - that makes use of BIFs to do binary
+%% matching and will thus be much faster.
+
description() ->
[{name, <<"PLAIN">>},
{description, <<"SASL PLAIN authentication mechanism">>}].
@@ -41,11 +45,32 @@ init(_Sock) ->
[].
handle_response(Response, _State) ->
- %% The '%%"' at the end of the next line is for Emacs
- case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
- [{capture, all_but_first, binary}]) of
- {match, [User, Pass]} ->
+ case extract_user_pass(Response) of
+ {ok, User, Pass} ->
rabbit_access_control:check_user_pass_login(User, Pass);
- _ ->
+ error ->
{protocol_error, "response ~p invalid", [Response]}
end.
+
+extract_user_pass(Response) ->
+ case extract_elem(Response) of
+ {ok, User, Response1} -> case extract_elem(Response1) of
+ {ok, Pass, <<>>} -> {ok, User, Pass};
+ _ -> error
+ end;
+ error -> error
+ end.
+
+extract_elem(<<0:8, Rest/binary>>) ->
+ Count = next_null_pos(Rest),
+ <<Elem:Count/binary, Rest1/binary>> = Rest,
+ {ok, Elem, Rest1};
+extract_elem(_) ->
+ error.
+
+next_null_pos(Bin) ->
+ next_null_pos(Bin, 0).
+
+next_null_pos(<<>>, Count) -> Count;
+next_null_pos(<<0:8, _Rest/binary>>, Count) -> Count;
+next_null_pos(<<_:8, Rest/binary>>, Count) -> next_null_pos(Rest, Count + 1).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index b270927b..96a22dca 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case mnesia:read(rabbit_exchange, XName) of
+ case mnesia:read({rabbit_exchange, XName}) of
[] ->
add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
[X] ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b92206ad..91559ea6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,8 +41,10 @@
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ messages_unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -280,12 +282,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
%% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MsgSeqNos, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}),
+ {MXs, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}, State),
erase_queue_stats(QPid),
- noreply(queue_blocked(QPid, record_confirms(MsgSeqNos,
- State#ch{unconfirmed = UC1}))).
+ noreply(
+ queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -471,38 +473,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc) ->
+remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
Acc;
-remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+ remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
+ State).
-record_confirm(undefined, State) -> State;
-record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State).
+record_confirm(undefined, _, State) ->
+ State;
+record_confirm(MsgSeqNo, XName, State) ->
+ record_confirms([{MsgSeqNo, XName}], State).
record_confirms([], State) ->
State;
-record_confirms(MsgSeqNos, State = #ch{confirmed = C}) ->
- State#ch{confirmed = [MsgSeqNos | C]}.
+record_confirms(MXs, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MXs | C]}.
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {DoneMessages, UC2} =
+ {MXs, UC1} =
lists:foldl(
fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
none -> Acc;
- {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
+ {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
end
end, {[], UC}, MsgSeqNos),
- record_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
-remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -555,7 +561,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1222,20 +1228,20 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- record_confirm(MsgSeqNo, State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
#ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1244,8 +1250,11 @@ lock_message(false, _MsgStruct, State) ->
State.
send_confirms(State = #ch{confirmed = C}) ->
- send_confirms(lists:append(C), State #ch{confirmed = []}).
-
+ C1 = lists:append(C),
+ MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+ MsgSeqNo
+ end || {MsgSeqNo, ExchangeName} <- C1 ],
+ send_confirms(MsgSeqNos, State #ch{confirmed = []}).
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1255,7 +1264,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
- false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
case Ms of
@@ -1283,8 +1292,11 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_trees:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 4228ff7f..80483097 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -77,24 +77,24 @@ start() ->
true -> ok;
false -> io:format("...done.~n")
end,
- halt();
+ quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
[string:join([atom_to_list(Command) | Args], " ")]),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
- halt(2);
+ quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
- halt(2);
+ quit(2);
{badrpc, Reason} ->
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
- halt(2);
+ quit(2);
Other ->
print_error("~p", [Other]),
- halt(2)
+ quit(2)
end.
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
@@ -130,7 +130,7 @@ stop() ->
usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
- halt(1).
+ quit(1).
action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
@@ -327,11 +327,11 @@ format_info_item(#resource{name = Name}) ->
escape(Name);
format_info_item({N1, N2, N3, N4} = Value) when
?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
- inet_parse:ntoa(Value);
+ rabbit_misc:ntoa(Value);
format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
- inet_parse:ntoa(Value);
+ rabbit_misc:ntoa(Value);
format_info_item(Value) when is_pid(Value) ->
rabbit_misc:pid_to_string(Value);
format_info_item(Value) when is_binary(Value) ->
@@ -393,3 +393,12 @@ prettify_typed_amqp_value(Type, Value) ->
array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
_ -> Value
end.
+
+% the slower shutdown on windows required to flush stdout
+quit(Status) ->
+ case os:type() of
+ {unix, _} ->
+ halt(Status);
+ {win32, _} ->
+ init:stop(Status)
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 03317d70..3a4fb024 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,6 +55,7 @@
-export([now_ms/0]).
-export([lock_file/1]).
-export([const_ok/1, const/1]).
+-export([ntoa/1, ntoab/1]).
%%----------------------------------------------------------------------------
@@ -191,6 +192,8 @@
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-spec(const_ok/1 :: (any()) -> 'ok').
-spec(const/1 :: (A) -> const(A)).
+-spec(ntoa/1 :: (inet:ip_address()) -> string()).
+-spec(ntoab/1 :: (inet:ip_address()) -> string()).
-endif.
@@ -832,3 +835,17 @@ lock_file(Path) ->
const_ok(_) -> ok.
const(X) -> fun (_) -> X end.
+
+%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
+%% when IPv6 is enabled but not used (i.e. 99% of the time).
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+ inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+ntoa(IP) ->
+ inet_parse:ntoa(IP).
+
+ntoab(IP) ->
+ Str = ntoa(IP),
+ case string:str(Str, ":") of
+ 0 -> Str;
+ _ -> "[" ++ Str ++ "]"
+ end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 529e3e07..e9c356e1 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+ noreply(write_message(CRef, <<>>,
+ State #msstate { dying_clients = DyingClients1 }));
handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case should_mask_action(CRef, Guid, State) of
- {true, _Location} ->
- noreply(State);
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize }} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
- %% message, but as it is being GC'd currently,
- %% we'll have to write a new copy, which will then
- %% be younger, so ignore this write.
- noreply(State);
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State1))
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
- end;
+ noreply(
+ case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ {write, State1} ->
+ write_message(CRef, Guid, Msg, State1);
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, Guid, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written),
+ CTG
+ end, CRef, State1)
+ end);
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
@@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_action({true, not_found}, _Guid, State) ->
+ {ignore, undefined, State};
+write_action({true, #msg_location { file = File }}, _Guid, State) ->
+ {ignore, File, State};
+write_action({false, not_found}, _Guid, State) ->
+ {write, State};
+write_action({Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }},
+ Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ {write, State};
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently we'll have
+ %% to write a new copy, which will then be younger, so
+ %% ignore this write.
+ {ignore, File, State};
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ {confirm, File, State1}
+ end;
+write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
+ Guid, State) ->
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ %% We already know about it, just update counter. Only update
+ %% field otherwise bad interaction with concurrent GC
+ {confirm, File, State}.
+
write_message(CRef, Guid, Msg, State) ->
write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
@@ -943,11 +965,10 @@ write_message(Guid, Msg,
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })).
+ maybe_roll_to_new_file(CurOffset + TotalSize,
+ State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }).
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) ->
gb_sets:singleton(Guid), CTG)
end, CRef, State).
-client_confirm_if_on_disk(CRef, Guid, CurFile,
- State = #msstate { current_file = CurFile }) ->
- record_pending_confirm(CRef, Guid, State);
-client_confirm_if_on_disk(CRef, Guid, _File, State) ->
- update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
- end, CRef, State).
-
client_confirm(CRef, Guids, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTG) ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 7c07c4fe..ebd7fe8a 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -336,6 +336,8 @@ get_node_tcp_listener() ->
case application:get_env(rabbit, tcp_listeners) of
{ok, [{_IpAddy, _Port} = Listener]} ->
Listener;
+ {ok, [Port]} when is_number(Port) ->
+ {"0.0.0.0", Port};
{ok, []} ->
undefined;
{ok, Other} ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 0a7d9dd7..283d25c7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -16,15 +16,15 @@
-module(rabbit_networking).
--export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
- stop_tcp_listener/2, on_node_down/1, active_listeners/0,
+-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
+ stop_tcp_listener/1, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/3]).
+-export([check_tcp_listener_address/2]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
start_client/1, start_ssl_client/2]).
@@ -44,17 +44,24 @@
-define(SSL_TIMEOUT, 5). %% seconds
+-define(FIRST_TEST_BIND_PORT, 10000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([ip_port/0, hostname/0]).
+-type(family() :: atom()).
+-type(listener_config() :: ip_port() |
+ {hostname(), ip_port()} |
+ {hostname(), ip_port(), family()}).
+
-spec(start/0 :: () -> 'ok').
--spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
--spec(start_ssl_listener/3 :: (hostname(), ip_port(), rabbit_types:infos())
- -> 'ok').
--spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
+-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
+-spec(start_ssl_listener/2 ::
+ (listener_config(), rabbit_types:infos()) -> 'ok').
+-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
@@ -69,8 +76,8 @@
(rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/3 ::
- (atom(), hostname(), ip_port()) -> {inet:ip_address(), atom()}).
+-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
+ -> [{inet:ip_address(), ip_port(), family(), atom()}]).
-endif.
@@ -83,7 +90,7 @@ boot() ->
boot_tcp() ->
{ok, TcpListeners} = application:get_env(tcp_listeners),
- [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ [ok = start_tcp_listener(Listener) || Listener <- TcpListeners],
ok.
boot_ssl() ->
@@ -103,7 +110,7 @@ boot_ssl() ->
end}
| SslOptsConfig]
end,
- [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ [start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners],
ok
end.
@@ -117,61 +124,97 @@ start() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-getaddr(Host) ->
- %% inet_parse:address takes care of ip string, like "0.0.0.0"
- %% inet:getaddr returns immediately for ip tuple {0,0,0,0},
- %% and runs 'inet_gethost' port process for dns lookups.
- %% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+
+getaddr(Host, Family) ->
case inet_parse:address(Host) of
- {ok, IPAddress1} -> IPAddress1;
- {error, _} ->
- case inet:getaddr(Host, inet) of
- {ok, IPAddress2} -> IPAddress2;
- {error, Reason} ->
- error_logger:error_msg("invalid host ~p - ~p~n",
- [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}})
- end
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
end.
-check_tcp_listener_address(NamePrefix, Host, Port) ->
- IPAddress = getaddr(Host),
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
+check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
+ check_tcp_listener_address_auto(NamePrefix, Port);
+
+check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+ %% Variant to prevent lots of hacking around in bash and batch files
+ check_tcp_listener_address_auto(NamePrefix, Port);
+
+check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ %% auto: determine family IPv4 / IPv6 after converting to IP address
+ check_tcp_listener_address(NamePrefix, {Host, Port, auto});
+
+check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
throw({error, {invalid_port, Port}})
end,
- Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
- {IPAddress, Name}.
+ [{IPAddress, Port, Family,
+ rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)].
-start_tcp_listener(Host, Port) ->
- start_listener(Host, Port, amqp, "TCP Listener",
+check_tcp_listener_address_auto(NamePrefix, Port) ->
+ lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ Listener <- port_to_listeners(Port)]).
+
+start_tcp_listener(Listener) ->
+ start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
-start_ssl_listener(Host, Port, SslOpts) ->
- start_listener(Host, Port, 'amqp/ssl', "SSL Listener",
+start_ssl_listener(Listener, SslOpts) ->
+ start_listener(Listener, 'amqp/ssl', "SSL Listener",
{?MODULE, start_ssl_client, [SslOpts]}).
-start_listener(Host, Port, Protocol, Label, OnConnect) ->
- {IPAddress, Name} =
- check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
+start_listener(Listener, Protocol, Label, OnConnect) ->
+ [start_listener0(Spec, Protocol, Label, OnConnect) ||
+ Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ ok.
+
+start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port, ?RABBIT_TCP_OPTS ,
+ [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}),
+ transient, infinity, supervisor, [tcp_listener_sup]}).
+
+stop_tcp_listener(Listener) ->
+ [stop_tcp_listener0(Spec) ||
+ Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
ok.
-stop_tcp_listener(Host, Port) ->
- IPAddress = getaddr(Host),
+stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
- ok = supervisor:delete_child(rabbit_sup, Name),
- ok.
+ ok = supervisor:delete_child(rabbit_sup, Name).
tcp_listener_started(Protocol, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
@@ -252,15 +295,102 @@ close_connection(Pid, Explanation) ->
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
- {ok, Hostname} = inet:gethostname(),
- case inet:gethostbyname(Hostname) of
- {ok, #hostent{h_name = Name}} -> Name;
- {error, _Reason} -> Hostname
- end;
+ hostname();
+
+tcp_host({0,0,0,0,0,0,0,0}) ->
+ hostname();
+
tcp_host(IPAddress) ->
case inet:gethostbyaddr(IPAddress) of
{ok, #hostent{h_name = Name}} -> Name;
- {error, _Reason} -> inet_parse:ntoa(IPAddress)
+ {error, _Reason} -> rabbit_misc:ntoa(IPAddress)
+ end.
+
+hostname() ->
+ {ok, Hostname} = inet:gethostname(),
+ case inet:gethostbyname(Hostname) of
+ {ok, #hostent{h_name = Name}} -> Name;
+ {error, _Reason} -> Hostname
end.
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+
+%%--------------------------------------------------------------------
+
+%% There are three kinds of machine (for our purposes).
+%%
+%% * Those which treat IPv4 addresses as a special kind of IPv6 address
+%% ("Single stack")
+%% - Linux by default, Windows Vista and later
+%% - We also treat any (hypothetical?) IPv6-only machine the same way
+%% * Those which consider IPv6 and IPv4 to be completely separate things
+%% ("Dual stack")
+%% - OpenBSD, Windows XP / 2003, Linux if so configured
+%% * Those which do not support IPv6.
+%% - Ancient/weird OSes, Linux if so configured
+%%
+%% How to reconfigure Linux to test this:
+%% Single stack (default):
+%% echo 0 > /proc/sys/net/ipv6/bindv6only
+%% Dual stack:
+%% echo 1 > /proc/sys/net/ipv6/bindv6only
+%% IPv4 only:
+%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then
+%% sudo update-grub && sudo reboot
+%%
+%% This matters in (and only in) the case where the sysadmin (or the
+%% app descriptor) has only supplied a port and we wish to bind to
+%% "all addresses". This means different things depending on whether
+%% we're single or dual stack. On single stack binding to "::"
+%% implicitly includes all IPv4 addresses, and subsequently attempting
+%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will
+%% only bind to IPv6 addresses, and we need another listener bound to
+%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only
+%% want to bind to "0.0.0.0".
+%%
+%% Unfortunately it seems there is no way to detect single vs dual stack
+%% apart from attempting to bind to the port.
+port_to_listeners(Port) ->
+ IPv4 = {"0.0.0.0", Port, inet},
+ IPv6 = {"::", Port, inet6},
+ case ipv6_status(?FIRST_TEST_BIND_PORT) of
+ single_stack -> [IPv6];
+ ipv6_only -> [IPv6];
+ dual_stack -> [IPv6, IPv4];
+ ipv4_only -> [IPv4]
+ end.
+
+ipv6_status(TestPort) ->
+ IPv4 = [inet, {ip, {0,0,0,0}}],
+ IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}],
+ case gen_tcp:listen(TestPort, IPv6) of
+ {ok, LSock6} ->
+ case gen_tcp:listen(TestPort, IPv4) of
+ {ok, LSock4} ->
+ %% Dual stack
+ gen_tcp:close(LSock6),
+ gen_tcp:close(LSock4),
+ dual_stack;
+ %% Checking the error here would only let us
+ %% distinguish single stack IPv6 / IPv4 vs IPv6 only,
+ %% which we figure out below anyway.
+ {error, _} ->
+ gen_tcp:close(LSock6),
+ case gen_tcp:listen(TestPort, IPv4) of
+ %% Single stack
+ {ok, LSock4} -> gen_tcp:close(LSock4),
+ single_stack;
+ %% IPv6-only machine. Welcome to the future.
+ {error, eafnosupport} -> ipv6_only;
+ %% Dual stack machine with something already
+ %% on IPv4.
+ {error, _} -> ipv6_status(TestPort + 1)
+ end
+ end;
+ {error, eafnosupport} ->
+ %% IPv4-only machine. Welcome to the 90s.
+ ipv4_only;
+ {error, _} ->
+ %% Port in use
+ ipv6_status(TestPort + 1)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f8114d86..475c415e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -256,7 +256,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = inet_parse:ntoa(PeerAddress),
+ PeerAddressS = rabbit_misc:ntoab(PeerAddress),
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
ClientSock = socket_op(Sock, SockTransform),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f39bc964..7142d560 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -281,12 +281,11 @@
-record(sync, { acks_persistent, acks_all, pubs, funs }).
%% When we discover, on publish, that we should write some indices to
-%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
-%% betas that we must be due to write indices for before we do any
-%% work at all. This is both a minimum and a maximum - we don't write
-%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
-%% write more - we can always come back on the next publish to do
-%% more.
+%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
+%% that we must be due to write indices for before we do any work at
+%% all. This is both a minimum and a maximum - we don't write fewer
+%% than IO_BATCH_SIZE indices out in one go, and we don't write more -
+%% we can always come back on the next publish to do more.
-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -299,7 +298,7 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id() | 'blank_ack').
+-type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -509,7 +508,7 @@ publish(Msg, MsgProps, State) ->
publish_delivered(false, #basic_message { guid = Guid },
_MsgProps, State = #vqstate { len = 0 }) ->
blind_confirm(self(), gb_sets:singleton(Guid)),
- {blank_ack, a(State)};
+ {undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
MsgProps = #message_properties {
@@ -628,7 +627,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
MsgStatus #msg_status {
is_delivered = true }, State),
{SeqId, StateN};
- false -> {blank_ack, State}
+ false -> {undefined, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
@@ -897,7 +896,7 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a guid to the unconfimred set
+%% when requeueing, we re-add a guid to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 18e2bdad..1a240856 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
{noreply, NState};
handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
- {value, Child} ->
- {ok, NState} = do_restart(RestartType, Reason, Child, State),
+ {value, Child1} ->
+ {ok, NState} = do_restart(RestartType, Reason, Child1, State),
{noreply, NState};
_ ->
{noreply, State}
@@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, _TRef} = timer:apply_after(
trunc(Delay*1000), ?MODULE, delayed_restart,
[self(), {{RestartType, Delay}, Reason, Child}]),
- {ok, NState}
+ {ok, state_del_child(Child, NState)}
end;
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 194389e3..0d50683d 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -59,8 +59,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
{PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [inet_parse:ntoa(Address), Port,
- inet_parse:ntoa(PeerAddress), PeerPort]),
+ [rabbit_misc:ntoab(Address), Port,
+ rabbit_misc:ntoab(PeerAddress), PeerPort]),
%% In the event that somebody floods us with connections we can spew
%% the above message at error_logger faster than it can keep up.
%% So error_logger's mailbox grows unbounded until we eat all the
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index b1bfcafc..cd646969 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -50,8 +50,9 @@ init({IPAddress, Port, SocketOpts,
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
- error_logger:info_msg("started ~s on ~s:~p~n",
- [Label, inet_parse:ntoa(LIPAddress), LPort]),
+ error_logger:info_msg(
+ "started ~s on ~s:~p~n",
+ [Label, rabbit_misc:ntoab(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
@@ -59,7 +60,7 @@ init({IPAddress, Port, SocketOpts,
{error, Reason} ->
error_logger:error_msg(
"failed to start ~s on ~s:~p - ~p~n",
- [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
+ [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
@@ -76,7 +77,7 @@ terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
error_logger:info_msg("stopped ~s on ~s:~p~n",
- [Label, inet_parse:ntoa(IPAddress), Port]),
+ [Label, rabbit_misc:ntoab(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 76be63d0..b4df1fd0 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -59,19 +59,21 @@ start_child() ->
ping_child(SupPid) ->
Ref = make_ref(),
- get_child_pid(SupPid) ! {ping, Ref, self()},
+ with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end),
receive {pong, Ref} -> ok
after 1000 -> timeout
end.
exit_child(SupPid) ->
- true = exit(get_child_pid(SupPid), abnormal),
+ with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end),
ok.
-get_child_pid(SupPid) ->
- [{_Id, ChildPid, worker, [test_sup]}] =
- supervisor2:which_children(SupPid),
- ChildPid.
+with_child_pid(SupPid, Fun) ->
+ case supervisor2:which_children(SupPid) of
+ [{_Id, undefined, worker, [test_sup]}] -> ok;
+ [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid);
+ [] -> ok
+ end.
run_child() ->
receive {ping, Ref, Pid} -> Pid ! {pong, Ref},