diff options
author | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-01-27 13:49:27 +0000 |
---|---|---|
committer | Vlad Alexandru Ionescu <vlad@rabbitmq.com> | 2011-01-27 13:49:27 +0000 |
commit | 2d286f26295a4a11141b572d3bb9183b9dc3db3e (patch) | |
tree | 626ae04e659009570f5c853cdfb5a653a774303d /src | |
parent | 37692ccb4afc9545807ff438ebae8d3d83602e40 (diff) | |
parent | ba418f0f7c06821af7ce7d3719f35895830c1acd (diff) | |
download | rabbitmq-server-2d286f26295a4a11141b572d3bb9183b9dc3db3e.tar.gz |
merging in from default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 54 | ||||
-rw-r--r-- | src/rabbit_auth_mechanism_plain.erl | 35 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 78 | ||||
-rw-r--r-- | src/rabbit_control.erl | 25 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 17 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 103 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 2 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 226 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 19 | ||||
-rw-r--r-- | src/supervisor2.erl | 6 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 4 | ||||
-rw-r--r-- | src/tcp_listener.erl | 9 | ||||
-rw-r--r-- | src/test_sup.erl | 14 |
16 files changed, 416 insertions, 206 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ad9e3ce6..a6da551d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -137,7 +137,9 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit()). + rabbit_types:connection_exit() | + fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: @@ -215,8 +217,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [_] -> %% Q exists on stopped node rabbit_misc:const(not_found) end; - [ExistingQ] -> - rabbit_misc:const(ExistingQ) + [ExistingQ = #amqqueue{pid = QPid}] -> + case is_process_alive(QPid) of + true -> rabbit_misc:const(ExistingQ); + false -> TailFun = internal_delete(QueueName), + fun (Tx) -> TailFun(Tx), ExistingQ end + end end end). @@ -432,17 +438,15 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> internal_delete1(QueueName) + [] -> rabbit_misc:const({error, not_found}); + [_] -> Deletions = internal_delete1(QueueName), + fun (Tx) -> ok = rabbit_binding:process_deletions( + Deletions, Tx) + end end - end, - fun ({error, _} = Err, _Tx) -> - Err; - (Deletions, Tx) -> - ok = rabbit_binding:process_deletions(Deletions, Tx) end). maybe_run_queue_via_backing_queue(QPid, Fun) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 663977ba..3418c663 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -record_current_channel_tx(ChPid, Txn) -> - %% as a side effect this also starts monitoring the channel (if - %% that wasn't happening already) - store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, @@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn, {NeedsConfirming, State = #q{backing_queue = BQ, backing_queue_state = BQS}}) -> - record_current_channel_tx(ChPid, Txn), + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), {true, NeedsConfirming, State#q{backing_queue_state = @@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> true -> {stop, State1}; false -> State2 = case Txn of none -> State1; - _ -> rollback_transaction(Txn, ChPid, + _ -> rollback_transaction(Txn, C, State1) end, {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -627,26 +622,23 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue( confirm_messages(Guids, State#q{backing_queue_state = BQS1})). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, - fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), - BQS), - %% ChPid must be known here because of the participant management - %% by the channel. - C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), +commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> + {AckTags, BQS1} = BQ:tx_commit( + Txn, fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), BQS), ChAckTags1 = subtract_acks(ChAckTags, AckTags), maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. -rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +rollback_transaction(Txn, C, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here (would also require ChPid) - record_current_channel_tx(ChPid, none), + %% we would add them back in here. + maybe_store_ch_record(C#cr{txn = none}), State#q{backing_queue_state = BQS1}. subtract_acks(A, B) when is_list(B) -> @@ -848,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) -> noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> - NewState = commit_transaction(Txn, From, ChPid, State), - noreply(run_message_queue(NewState)); + case lookup_ch(ChPid) of + not_found -> reply(ok, State); + C -> noreply(run_message_queue( + commit_transaction(Txn, From, C, State))) + end; handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1048,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end; handle_cast({rollback, Txn, ChPid}, State) -> - noreply(rollback_transaction(Txn, ChPid, State)); + noreply(case lookup_ch(ChPid) of + not_found -> State; + C -> rollback_transaction(Txn, C, State) + end); handle_cast(delete_immediately, State) -> {stop, normal, State}; @@ -1151,15 +1149,15 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> - BQS1 = BQ:handle_pre_hibernate(BQS), - %% no activity for a while == 0 egress and ingress rates + {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), infinity), + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS2}, + backing_queue_state = BQS3}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 7d9dcd20..1ca07018 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -33,6 +33,10 @@ %% SASL PLAIN, as used by the Qpid Java client and our clients. Also, %% apparently, by OpenAMQ. +%% TODO: once the minimum erlang becomes R13B03, reimplement this +%% using the binary module - that makes use of BIFs to do binary +%% matching and will thus be much faster. + description() -> [{name, <<"PLAIN">>}, {description, <<"SASL PLAIN authentication mechanism">>}]. @@ -41,11 +45,32 @@ init(_Sock) -> []. handle_response(Response, _State) -> - %% The '%%"' at the end of the next line is for Emacs - case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" - [{capture, all_but_first, binary}]) of - {match, [User, Pass]} -> + case extract_user_pass(Response) of + {ok, User, Pass} -> rabbit_access_control:check_user_pass_login(User, Pass); - _ -> + error -> {protocol_error, "response ~p invalid", [Response]} end. + +extract_user_pass(Response) -> + case extract_elem(Response) of + {ok, User, Response1} -> case extract_elem(Response1) of + {ok, Pass, <<>>} -> {ok, User, Pass}; + _ -> error + end; + error -> error + end. + +extract_elem(<<0:8, Rest/binary>>) -> + Count = next_null_pos(Rest), + <<Elem:Count/binary, Rest1/binary>> = Rest, + {ok, Elem, Rest1}; +extract_elem(_) -> + error. + +next_null_pos(Bin) -> + next_null_pos(Bin, 0). + +next_null_pos(<<>>, Count) -> Count; +next_null_pos(<<0:8, _Rest/binary>>, Count) -> Count; +next_null_pos(<<_:8, Rest/binary>>, Count) -> next_null_pos(Rest, Count + 1). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index b270927b..96a22dca 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). maybe_auto_delete(XName, Bindings, Deletions) -> - case mnesia:read(rabbit_exchange, XName) of + case mnesia:read({rabbit_exchange, XName}) of [] -> add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); [X] -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b92206ad..91559ea6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -41,8 +41,10 @@ -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + messages_unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -280,12 +282,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MsgSeqNos, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}), + {MXs, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}, State), erase_queue_stats(QPid), - noreply(queue_blocked(QPid, record_confirms(MsgSeqNos, - State#ch{unconfirmed = UC1}))). + noreply( + queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -471,38 +473,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc) -> +remove_queue_unconfirmed(none, _QPid, Acc, _State) -> Acc; -remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> +remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). + remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), + State). -record_confirm(undefined, State) -> State; -record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State). +record_confirm(undefined, _, State) -> + State; +record_confirm(MsgSeqNo, XName, State) -> + record_confirms([{MsgSeqNo, XName}], State). record_confirms([], State) -> State; -record_confirms(MsgSeqNos, State = #ch{confirmed = C}) -> - State#ch{confirmed = [MsgSeqNos | C]}. +record_confirms(MXs, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MXs | C]}. confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {DoneMessages, UC2} = + {MXs, UC1} = lists:foldl( fun(MsgSeqNo, {_DMs, UC0} = Acc) -> case gb_trees:lookup(MsgSeqNo, UC0) of none -> Acc; - {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) + {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) end end, {[], UC}, MsgSeqNos), - record_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + record_confirms(MXs, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> +remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of - 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)} + 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -555,7 +561,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1222,20 +1228,20 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_route), - record_confirm(MsgSeqNo, State); -process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_consumers), - record_confirm(MsgSeqNo, State); -process_routing_result(routed, [], MsgSeqNo, _, State) -> - record_confirm(MsgSeqNo, State); -process_routing_result(routed, _, undefined, _, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), + record_confirm(MsgSeqNo, XName, State); +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + record_confirm(MsgSeqNo, XName, State); +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> + record_confirm(MsgSeqNo, XName, State); +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1244,8 +1250,11 @@ lock_message(false, _MsgStruct, State) -> State. send_confirms(State = #ch{confirmed = C}) -> - send_confirms(lists:append(C), State #ch{confirmed = []}). - + C1 = lists:append(C), + MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), + MsgSeqNo + end || {MsgSeqNo, ExchangeName} <- C1 ], + send_confirms(MsgSeqNos, State #ch{confirmed = []}). send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1255,7 +1264,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of @@ -1283,8 +1292,11 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + gb_trees:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 4228ff7f..80483097 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -77,24 +77,24 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - halt(); + quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> print_error("invalid command '~s'", [string:join([atom_to_list(Command) | Args], " ")]), usage(); {error, Reason} -> print_error("~p", [Reason]), - halt(2); + quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), - halt(2); + quit(2); {badrpc, Reason} -> print_error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), - halt(2); + quit(2); Other -> print_error("~p", [Other]), - halt(2) + quit(2) end. fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). @@ -130,7 +130,7 @@ stop() -> usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), - halt(1). + quit(1). action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), @@ -327,11 +327,11 @@ format_info_item(#resource{name = Name}) -> escape(Name); format_info_item({N1, N2, N3, N4} = Value) when ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> - inet_parse:ntoa(Value); + rabbit_misc:ntoa(Value); format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> - inet_parse:ntoa(Value); + rabbit_misc:ntoa(Value); format_info_item(Value) when is_pid(Value) -> rabbit_misc:pid_to_string(Value); format_info_item(Value) when is_binary(Value) -> @@ -393,3 +393,12 @@ prettify_typed_amqp_value(Type, Value) -> array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; _ -> Value end. + +% the slower shutdown on windows required to flush stdout +quit(Status) -> + case os:type() of + {unix, _} -> + halt(Status); + {win32, _} -> + init:stop(Status) + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 03317d70..3a4fb024 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,6 +55,7 @@ -export([now_ms/0]). -export([lock_file/1]). -export([const_ok/1, const/1]). +-export([ntoa/1, ntoab/1]). %%---------------------------------------------------------------------------- @@ -191,6 +192,8 @@ -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -spec(const_ok/1 :: (any()) -> 'ok'). -spec(const/1 :: (A) -> const(A)). +-spec(ntoa/1 :: (inet:ip_address()) -> string()). +-spec(ntoab/1 :: (inet:ip_address()) -> string()). -endif. @@ -832,3 +835,17 @@ lock_file(Path) -> const_ok(_) -> ok. const(X) -> fun (_) -> X end. + +%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see +%% when IPv6 is enabled but not used (i.e. 99% of the time). +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); +ntoa(IP) -> + inet_parse:ntoa(IP). + +ntoab(IP) -> + Str = ntoa(IP), + case string:str(Str, ":") of + 0 -> Str; + _ -> "[" ++ Str ++ "]" + end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 529e3e07..e9c356e1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + noreply(write_message(CRef, <<>>, + State #msstate { dying_clients = DyingClients1 })); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case should_mask_action(CRef, Guid, State) of - {true, _Location} -> - noreply(State); - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize }} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death - %% message, but as it is being GC'd currently, - %% we'll have to write a new copy, which will then - %% be younger, so ignore this write. - noreply(State); - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - State1 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State1)) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File }} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) - end; + noreply( + case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + {write, State1} -> + write_message(CRef, Guid, Msg, State1); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, Guid, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(gb_sets:singleton(Guid), written), + CTG + end, CRef, State1) + end); handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( @@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_action({true, not_found}, _Guid, State) -> + {ignore, undefined, State}; +write_action({true, #msg_location { file = File }}, _Guid, State) -> + {ignore, File, State}; +write_action({false, not_found}, _Guid, State) -> + {write, State}; +write_action({Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }}, + Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + {write, State}; + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently we'll have + %% to write a new copy, which will then be younger, so + %% ignore this write. + {ignore, File, State}; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + {confirm, File, State1} + end; +write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, + Guid, State) -> + ok = index_update_ref_count(Guid, RefCount + 1, State), + %% We already know about it, just update counter. Only update + %% field otherwise bad interaction with concurrent GC + {confirm, File, State}. + write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). @@ -943,11 +965,10 @@ write_message(Guid, Msg, [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })). + maybe_roll_to_new_file(CurOffset + TotalSize, + State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize }). read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) -> gb_sets:singleton(Guid), CTG) end, CRef, State). -client_confirm_if_on_disk(CRef, Guid, CurFile, - State = #msstate { current_file = CurFile }) -> - record_pending_confirm(CRef, Guid, State); -client_confirm_if_on_disk(CRef, Guid, _File, State) -> - update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG - end, CRef, State). - client_confirm(CRef, Guids, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTG) -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7c07c4fe..ebd7fe8a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -336,6 +336,8 @@ get_node_tcp_listener() -> case application:get_env(rabbit, tcp_listeners) of {ok, [{_IpAddy, _Port} = Listener]} -> Listener; + {ok, [Port]} when is_number(Port) -> + {"0.0.0.0", Port}; {ok, []} -> undefined; {ok, Other} -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 0a7d9dd7..283d25c7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -16,15 +16,15 @@ -module(rabbit_networking). --export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, - stop_tcp_listener/2, on_node_down/1, active_listeners/0, +-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, + stop_tcp_listener/1, on_node_down/1, active_listeners/0, node_listeners/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, close_connection/2]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/3]). +-export([check_tcp_listener_address/2]). -export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1, start_ssl_client/2]). @@ -44,17 +44,24 @@ -define(SSL_TIMEOUT, 5). %% seconds +-define(FIRST_TEST_BIND_PORT, 10000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -export_type([ip_port/0, hostname/0]). +-type(family() :: atom()). +-type(listener_config() :: ip_port() | + {hostname(), ip_port()} | + {hostname(), ip_port(), family()}). + -spec(start/0 :: () -> 'ok'). --spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). --spec(start_ssl_listener/3 :: (hostname(), ip_port(), rabbit_types:infos()) - -> 'ok'). --spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok'). +-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). +-spec(start_ssl_listener/2 :: + (listener_config(), rabbit_types:infos()) -> 'ok'). +-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(connections/0 :: () -> [rabbit_types:connection()]). @@ -69,8 +76,8 @@ (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(check_tcp_listener_address/3 :: - (atom(), hostname(), ip_port()) -> {inet:ip_address(), atom()}). +-spec(check_tcp_listener_address/2 :: (atom(), listener_config()) + -> [{inet:ip_address(), ip_port(), family(), atom()}]). -endif. @@ -83,7 +90,7 @@ boot() -> boot_tcp() -> {ok, TcpListeners} = application:get_env(tcp_listeners), - [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners], + [ok = start_tcp_listener(Listener) || Listener <- TcpListeners], ok. boot_ssl() -> @@ -103,7 +110,7 @@ boot_ssl() -> end} | SslOptsConfig] end, - [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + [start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners], ok end. @@ -117,61 +124,97 @@ start() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. -getaddr(Host) -> - %% inet_parse:address takes care of ip string, like "0.0.0.0" - %% inet:getaddr returns immediately for ip tuple {0,0,0,0}, - %% and runs 'inet_gethost' port process for dns lookups. - %% On Windows inet:getaddr runs dns resolver for ip string, which may fail. +%% inet_parse:address takes care of ip string, like "0.0.0.0" +%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, +%% and runs 'inet_gethost' port process for dns lookups. +%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. + +getaddr(Host, Family) -> case inet_parse:address(Host) of - {ok, IPAddress1} -> IPAddress1; - {error, _} -> - case inet:getaddr(Host, inet) of - {ok, IPAddress2} -> IPAddress2; - {error, Reason} -> - error_logger:error_msg("invalid host ~p - ~p~n", - [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}) - end + {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; + {error, _} -> gethostaddr(Host, Family) + end. + +gethostaddr(Host, auto) -> + Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], + case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of + [] -> host_lookup_error(Host, Lookups); + IPs -> IPs + end; + +gethostaddr(Host, Family) -> + case inet:getaddr(Host, Family) of + {ok, IPAddress} -> [{IPAddress, Family}]; + {error, Reason} -> host_lookup_error(Host, Reason) end. -check_tcp_listener_address(NamePrefix, Host, Port) -> - IPAddress = getaddr(Host), +host_lookup_error(Host, Reason) -> + error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}). + +resolve_family({_,_,_,_}, auto) -> inet; +resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; +resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); +resolve_family(_, F) -> F. + +check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> + check_tcp_listener_address_auto(NamePrefix, Port); + +check_tcp_listener_address(NamePrefix, {"auto", Port}) -> + %% Variant to prevent lots of hacking around in bash and batch files + check_tcp_listener_address_auto(NamePrefix, Port); + +check_tcp_listener_address(NamePrefix, {Host, Port}) -> + %% auto: determine family IPv4 / IPv6 after converting to IP address + check_tcp_listener_address(NamePrefix, {Host, Port, auto}); + +check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) -> if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), throw({error, {invalid_port, Port}}) end, - Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), - {IPAddress, Name}. + [{IPAddress, Port, Family, + rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} || + {IPAddress, Family} <- getaddr(Host, Family0)]. -start_tcp_listener(Host, Port) -> - start_listener(Host, Port, amqp, "TCP Listener", +check_tcp_listener_address_auto(NamePrefix, Port) -> + lists:append([check_tcp_listener_address(NamePrefix, Listener) || + Listener <- port_to_listeners(Port)]). + +start_tcp_listener(Listener) -> + start_listener(Listener, amqp, "TCP Listener", {?MODULE, start_client, []}). -start_ssl_listener(Host, Port, SslOpts) -> - start_listener(Host, Port, 'amqp/ssl', "SSL Listener", +start_ssl_listener(Listener, SslOpts) -> + start_listener(Listener, 'amqp/ssl', "SSL Listener", {?MODULE, start_ssl_client, [SslOpts]}). -start_listener(Host, Port, Protocol, Label, OnConnect) -> - {IPAddress, Name} = - check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), +start_listener(Listener, Protocol, Label, OnConnect) -> + [start_listener0(Spec, Protocol, Label, OnConnect) || + Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + ok. + +start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> {ok,_} = supervisor:start_child( rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, ?RABBIT_TCP_OPTS , + [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS], {?MODULE, tcp_listener_started, [Protocol]}, {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}), + transient, infinity, supervisor, [tcp_listener_sup]}). + +stop_tcp_listener(Listener) -> + [stop_tcp_listener0(Spec) || + Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], ok. -stop_tcp_listener(Host, Port) -> - IPAddress = getaddr(Host), +stop_tcp_listener0({IPAddress, Port, _Family, Name}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), - ok = supervisor:delete_child(rabbit_sup, Name), - ok. + ok = supervisor:delete_child(rabbit_sup, Name). tcp_listener_started(Protocol, IPAddress, Port) -> %% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1 @@ -252,15 +295,102 @@ close_connection(Pid, Explanation) -> %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> - {ok, Hostname} = inet:gethostname(), - case inet:gethostbyname(Hostname) of - {ok, #hostent{h_name = Name}} -> Name; - {error, _Reason} -> Hostname - end; + hostname(); + +tcp_host({0,0,0,0,0,0,0,0}) -> + hostname(); + tcp_host(IPAddress) -> case inet:gethostbyaddr(IPAddress) of {ok, #hostent{h_name = Name}} -> Name; - {error, _Reason} -> inet_parse:ntoa(IPAddress) + {error, _Reason} -> rabbit_misc:ntoa(IPAddress) + end. + +hostname() -> + {ok, Hostname} = inet:gethostname(), + case inet:gethostbyname(Hostname) of + {ok, #hostent{h_name = Name}} -> Name; + {error, _Reason} -> Hostname end. cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). + +%%-------------------------------------------------------------------- + +%% There are three kinds of machine (for our purposes). +%% +%% * Those which treat IPv4 addresses as a special kind of IPv6 address +%% ("Single stack") +%% - Linux by default, Windows Vista and later +%% - We also treat any (hypothetical?) IPv6-only machine the same way +%% * Those which consider IPv6 and IPv4 to be completely separate things +%% ("Dual stack") +%% - OpenBSD, Windows XP / 2003, Linux if so configured +%% * Those which do not support IPv6. +%% - Ancient/weird OSes, Linux if so configured +%% +%% How to reconfigure Linux to test this: +%% Single stack (default): +%% echo 0 > /proc/sys/net/ipv6/bindv6only +%% Dual stack: +%% echo 1 > /proc/sys/net/ipv6/bindv6only +%% IPv4 only: +%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then +%% sudo update-grub && sudo reboot +%% +%% This matters in (and only in) the case where the sysadmin (or the +%% app descriptor) has only supplied a port and we wish to bind to +%% "all addresses". This means different things depending on whether +%% we're single or dual stack. On single stack binding to "::" +%% implicitly includes all IPv4 addresses, and subsequently attempting +%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will +%% only bind to IPv6 addresses, and we need another listener bound to +%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only +%% want to bind to "0.0.0.0". +%% +%% Unfortunately it seems there is no way to detect single vs dual stack +%% apart from attempting to bind to the port. +port_to_listeners(Port) -> + IPv4 = {"0.0.0.0", Port, inet}, + IPv6 = {"::", Port, inet6}, + case ipv6_status(?FIRST_TEST_BIND_PORT) of + single_stack -> [IPv6]; + ipv6_only -> [IPv6]; + dual_stack -> [IPv6, IPv4]; + ipv4_only -> [IPv4] + end. + +ipv6_status(TestPort) -> + IPv4 = [inet, {ip, {0,0,0,0}}], + IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}], + case gen_tcp:listen(TestPort, IPv6) of + {ok, LSock6} -> + case gen_tcp:listen(TestPort, IPv4) of + {ok, LSock4} -> + %% Dual stack + gen_tcp:close(LSock6), + gen_tcp:close(LSock4), + dual_stack; + %% Checking the error here would only let us + %% distinguish single stack IPv6 / IPv4 vs IPv6 only, + %% which we figure out below anyway. + {error, _} -> + gen_tcp:close(LSock6), + case gen_tcp:listen(TestPort, IPv4) of + %% Single stack + {ok, LSock4} -> gen_tcp:close(LSock4), + single_stack; + %% IPv6-only machine. Welcome to the future. + {error, eafnosupport} -> ipv6_only; + %% Dual stack machine with something already + %% on IPv4. + {error, _} -> ipv6_status(TestPort + 1) + end + end; + {error, eafnosupport} -> + %% IPv4-only machine. Welcome to the 90s. + ipv4_only; + {error, _} -> + %% Port in use + ipv6_status(TestPort + 1) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f8114d86..475c415e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -256,7 +256,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), - PeerAddressS = inet_parse:ntoa(PeerAddress), + PeerAddressS = rabbit_misc:ntoab(PeerAddress), rabbit_log:info("starting TCP connection ~p from ~s:~p~n", [self(), PeerAddressS, PeerPort]), ClientSock = socket_op(Sock, SockTransform), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f39bc964..7142d560 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -281,12 +281,11 @@ -record(sync, { acks_persistent, acks_all, pubs, funs }). %% When we discover, on publish, that we should write some indices to -%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of -%% betas that we must be due to write indices for before we do any -%% work at all. This is both a minimum and a maximum - we don't write -%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't -%% write more - we can always come back on the next publish to do -%% more. +%% disk for some betas, the IO_BATCH_SIZE sets the number of betas +%% that we must be due to write indices for before we do any work at +%% all. This is both a minimum and a maximum - we don't write fewer +%% than IO_BATCH_SIZE indices out in one go, and we don't write more - +%% we can always come back on the next publish to do more. -define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -299,7 +298,7 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id() | 'blank_ack'). +-type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -509,7 +508,7 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, #basic_message { guid = Guid }, _MsgProps, State = #vqstate { len = 0 }) -> blind_confirm(self(), gb_sets:singleton(Guid)), - {blank_ack, a(State)}; + {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, MsgProps = #message_properties { @@ -628,7 +627,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { MsgStatus #msg_status { is_delivered = true }, State), {SeqId, StateN}; - false -> {blank_ack, State} + false -> {undefined, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), @@ -897,7 +896,7 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a guid to the unconfimred set +%% when requeueing, we re-add a guid to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 18e2bdad..1a240856 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) {noreply, NState}; handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of - {value, Child} -> - {ok, NState} = do_restart(RestartType, Reason, Child, State), + {value, Child1} -> + {ok, NState} = do_restart(RestartType, Reason, Child1, State), {noreply, NState}; _ -> {noreply, State} @@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, _TRef} = timer:apply_after( trunc(Delay*1000), ?MODULE, delayed_restart, [self(), {{RestartType, Delay}, Reason, Child}]), - {ok, NState} + {ok, state_del_child(Child, NState)} end; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 194389e3..0d50683d 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -59,8 +59,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [inet_parse:ntoa(Address), Port, - inet_parse:ntoa(PeerAddress), PeerPort]), + [rabbit_misc:ntoab(Address), Port, + rabbit_misc:ntoab(PeerAddress), PeerPort]), %% In the event that somebody floods us with connections we can spew %% the above message at error_logger faster than it can keep up. %% So error_logger's mailbox grows unbounded until we eat all the diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index b1bfcafc..cd646969 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -50,8 +50,9 @@ init({IPAddress, Port, SocketOpts, end, lists:duplicate(ConcurrentAcceptorCount, dummy)), {ok, {LIPAddress, LPort}} = inet:sockname(LSock), - error_logger:info_msg("started ~s on ~s:~p~n", - [Label, inet_parse:ntoa(LIPAddress), LPort]), + error_logger:info_msg( + "started ~s on ~s:~p~n", + [Label, rabbit_misc:ntoab(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), {ok, #state{sock = LSock, on_startup = OnStartup, on_shutdown = OnShutdown, @@ -59,7 +60,7 @@ init({IPAddress, Port, SocketOpts, {error, Reason} -> error_logger:error_msg( "failed to start ~s on ~s:~p - ~p~n", - [Label, inet_parse:ntoa(IPAddress), Port, Reason]), + [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. @@ -76,7 +77,7 @@ terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) -> {ok, {IPAddress, Port}} = inet:sockname(LSock), gen_tcp:close(LSock), error_logger:info_msg("stopped ~s on ~s:~p~n", - [Label, inet_parse:ntoa(IPAddress), Port]), + [Label, rabbit_misc:ntoab(IPAddress), Port]), apply(M, F, A ++ [IPAddress, Port]). code_change(_OldVsn, State, _Extra) -> diff --git a/src/test_sup.erl b/src/test_sup.erl index 76be63d0..b4df1fd0 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -59,19 +59,21 @@ start_child() -> ping_child(SupPid) -> Ref = make_ref(), - get_child_pid(SupPid) ! {ping, Ref, self()}, + with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end), receive {pong, Ref} -> ok after 1000 -> timeout end. exit_child(SupPid) -> - true = exit(get_child_pid(SupPid), abnormal), + with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end), ok. -get_child_pid(SupPid) -> - [{_Id, ChildPid, worker, [test_sup]}] = - supervisor2:which_children(SupPid), - ChildPid. +with_child_pid(SupPid, Fun) -> + case supervisor2:which_children(SupPid) of + [{_Id, undefined, worker, [test_sup]}] -> ok; + [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid); + [] -> ok + end. run_child() -> receive {ping, Ref, Pid} -> Pid ! {pong, Ref}, |