summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-03-08 11:54:56 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-03-08 11:54:56 +0000
commitc47f036b0744dd704cef1a446c16dc938835ecc1 (patch)
tree3768b386b7c3ed84e980f3e5625f25e2a815d937
parentdadc2af1e7d3987f4d60a6a275d4ce9af6d487e3 (diff)
parentc3a522c467f8884c772ca2a404036402a033e85e (diff)
downloadrabbitmq-server-c47f036b0744dd704cef1a446c16dc938835ecc1.tar.gz
Merge bug24614
-rw-r--r--src/rabbit_mirror_queue_misc.erl13
-rw-r--r--src/rabbit_net.erl7
-rw-r--r--src/rabbit_queue_index.erl50
-rw-r--r--src/rabbit_reader.erl13
-rw-r--r--src/tcp_acceptor.erl8
5 files changed, 59 insertions, 32 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index db7d8ecc..180677fe 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -134,18 +134,17 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> Result = rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]),
- case Result of
+ [] -> case rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]) of
{ok, undefined} -> %% Already running
ok;
- {ok, _Pid} ->
+ {ok, SPid} ->
rabbit_log:info(
"Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, Result]),
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
ok;
- _ ->
- Result
+ Other ->
+ Other
end;
[_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
end
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 02889b93..e6a05335 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,8 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1, connection_string/2]).
+ maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ connection_string/2]).
%%---------------------------------------------------------------------------
@@ -53,6 +54,7 @@
binary()}]) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
+-spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -135,6 +137,9 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
+maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
+maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3d07e8b0..3ef769c7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,6 +21,8 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, recover/1]).
+-export([scan/3]).
+
-export([add_queue_ttl/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -219,6 +221,12 @@
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
+-spec(scan/3 :: (file:filename(),
+ fun ((seq_id(), rabbit_types:msg_id(),
+ rabbit_types:message_properties(), boolean(),
+ ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A),
+ A) -> A).
+
-spec(add_queue_ttl/0 :: () -> 'ok').
-endif.
@@ -378,7 +386,10 @@ all_queue_directory_names(Dir) ->
%%----------------------------------------------------------------------------
blank_state(QueueName) ->
- Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
+ blank_state_dir(
+ filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+
+blank_state_dir(Dir) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -523,19 +534,34 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
- [ok = segment_entries_foldr(
- fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
- ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
- (_RelSeq, _Value, Acc) ->
- Acc
- end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
- Seg <- all_segment_nums(State)],
- {_SegmentCounts, _State} = terminate(State),
+ State = blank_state(QueueName),
+ ok = scan_segments(
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ gatherer:in(Gatherer, {MsgId, 1});
+ (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
+ _IsAcked, Acc) ->
+ Acc
+ end, ok, State),
ok = gatherer:finish(Gatherer).
+scan(Dir, Fun, Acc) ->
+ scan_segments(Fun, Acc, blank_state_dir(Dir)).
+
+scan_segments(Fun, Acc, State) ->
+ State1 = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(State),
+ Result = lists:foldr(
+ fun (Seg, AccN) ->
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ IsDelivered, IsAcked}, AccM) ->
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ IsPersistent, IsDelivered, IsAcked, AccM)
+ end, AccN, segment_find_or_new(Seg, Dir, Segments))
+ end, Acc, all_segment_nums(State1)),
+ {_SegmentCounts, _State} = terminate(State1),
+ Result.
+
%%----------------------------------------------------------------------------
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index add13043..47e796dc 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -233,12 +233,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), ConnStr, Ex])
after
- %% We don't close the socket explicitly. The reader is the
- %% controlling process and hence its termination will close
- %% the socket. Furthermore, gen_tcp:close/1 waits for pending
- %% output to be sent, which results in unnecessary delays.
+ %% The reader is the controlling process and hence its
+ %% termination will close the socket. Furthermore,
+ %% gen_tcp:close/1 waits for pending output to be sent, which
+ %% results in unnecessary delays. However, to keep the
+ %% file_handle_cache accounting as accurate as possible it
+ %% would be good to close the socket immediately if we
+ %% can. But we can only do this for non-ssl sockets.
%%
- %% gen_tcp:close(ClientSock),
+ rabbit_net:maybe_fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 43a6bc99..344196d7 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -69,13 +69,7 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {AddressS, Port} = case inet:sockname(LSock) of
- {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
- {error, _} -> {"unknown", unknown}
- end,
- error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
- [AddressS, Port, Reason]),
- accept(State);
+ {stop, {accept_failed, Reason}, State};
handle_info(_Info, State) ->
{noreply, State}.