diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-03-08 11:54:56 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-03-08 11:54:56 +0000 |
commit | c47f036b0744dd704cef1a446c16dc938835ecc1 (patch) | |
tree | 3768b386b7c3ed84e980f3e5625f25e2a815d937 | |
parent | dadc2af1e7d3987f4d60a6a275d4ce9af6d487e3 (diff) | |
parent | c3a522c467f8884c772ca2a404036402a033e85e (diff) | |
download | rabbitmq-server-c47f036b0744dd704cef1a446c16dc938835ecc1.tar.gz |
Merge bug24614
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_net.erl | 7 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 50 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 13 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 8 |
5 files changed, 59 insertions, 32 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index db7d8ecc..180677fe 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -134,18 +134,17 @@ add_mirror(Queue, MirrorNode) -> Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> Result = rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]), - case Result of + [] -> case rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]) of {ok, undefined} -> %% Already running ok; - {ok, _Pid} -> + {ok, SPid} -> rabbit_log:info( "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, Result]), + [rabbit_misc:rs(Name), MirrorNode, SPid]), ok; - _ -> - Result + Other -> + Other end; [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} end diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 02889b93..e6a05335 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -19,7 +19,8 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - sockname/1, peername/1, peercert/1, connection_string/2]). + maybe_fast_close/1, sockname/1, peername/1, peercert/1, + connection_string/2]). %%--------------------------------------------------------------------------- @@ -53,6 +54,7 @@ binary()}]) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). +-spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()). -spec(sockname/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). @@ -135,6 +137,9 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). +maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok; +maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok. + sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3d07e8b0..3ef769c7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,6 +21,8 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). +-export([scan/3]). + -export([add_queue_ttl/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -219,6 +221,12 @@ {non_neg_integer(), non_neg_integer(), qistate()}). -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). +-spec(scan/3 :: (file:filename(), + fun ((seq_id(), rabbit_types:msg_id(), + rabbit_types:message_properties(), boolean(), + ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A), + A) -> A). + -spec(add_queue_ttl/0 :: () -> 'ok'). -endif. @@ -378,7 +386,10 @@ all_queue_directory_names(Dir) -> %%---------------------------------------------------------------------------- blank_state(QueueName) -> - Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)), + blank_state_dir( + filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). + +blank_state_dir(Dir) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -523,19 +534,34 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), - [ok = segment_entries_foldr( - fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack}, - ok) -> - gatherer:in(Gatherer, {MsgId, 1}); - (_RelSeq, _Value, Acc) -> - Acc - end, ok, segment_find_or_new(Seg, Dir, Segments)) || - Seg <- all_segment_nums(State)], - {_SegmentCounts, _State} = terminate(State), + State = blank_state(QueueName), + ok = scan_segments( + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + gatherer:in(Gatherer, {MsgId, 1}); + (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, + _IsAcked, Acc) -> + Acc + end, ok, State), ok = gatherer:finish(Gatherer). +scan(Dir, Fun, Acc) -> + scan_segments(Fun, Acc, blank_state_dir(Dir)). + +scan_segments(Fun, Acc, State) -> + State1 = #qistate { segments = Segments, dir = Dir } = + recover_journal(State), + Result = lists:foldr( + fun (Seg, AccN) -> + segment_entries_foldr( + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + IsDelivered, IsAcked}, AccM) -> + Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + IsPersistent, IsDelivered, IsAcked, AccM) + end, AccN, segment_find_or_new(Seg, Dir, Segments)) + end, Acc, all_segment_nums(State1)), + {_SegmentCounts, _State} = terminate(State1), + Result. + %%---------------------------------------------------------------------------- %% expiry/binary manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index add13043..47e796dc 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -233,12 +233,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, end, "closing AMQP connection ~p (~s):~n~p~n", [self(), ConnStr, Ex]) after - %% We don't close the socket explicitly. The reader is the - %% controlling process and hence its termination will close - %% the socket. Furthermore, gen_tcp:close/1 waits for pending - %% output to be sent, which results in unnecessary delays. + %% The reader is the controlling process and hence its + %% termination will close the socket. Furthermore, + %% gen_tcp:close/1 waits for pending output to be sent, which + %% results in unnecessary delays. However, to keep the + %% file_handle_cache accounting as accurate as possible it + %% would be good to close the socket immediately if we + %% can. But we can only do this for non-ssl sockets. %% - %% gen_tcp:close(ClientSock), + rabbit_net:maybe_fast_close(ClientSock), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 43a6bc99..344196d7 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -69,13 +69,7 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {AddressS, Port} = case inet:sockname(LSock) of - {ok, {A, P}} -> {rabbit_misc:ntoab(A), P}; - {error, _} -> {"unknown", unknown} - end, - error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", - [AddressS, Port, Reason]), - accept(State); + {stop, {accept_failed, Reason}, State}; handle_info(_Info, State) -> {noreply, State}. |