summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl27
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_auth_mechanism_plain.erl35
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl61
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl)22
-rw-r--r--src/rabbit_direct.erl75
-rw-r--r--src/rabbit_msg_store.erl103
-rw-r--r--src/rabbit_networking.erl6
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/supervisor2.erl6
-rw-r--r--src/test_sup.erl14
16 files changed, 297 insertions, 124 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index ff55a15b..46bd8245 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,7 +36,7 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-endif.
@@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) ->
{Replies, BadNodes} =
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
- {invoke, Fun, Grouped},
- infinity)
+ RemoteNodes -> gen_server2:multi_call(
+ RemoteNodes, delegate(RemoteNodes),
+ {invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
@@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
@@ -111,17 +111,22 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count() ->
- {ok, Count} = application:get_env(rabbit, delegate_count),
+delegate_count([RemoteNode | _]) ->
+ {ok, Count} = case application:get_env(rabbit, delegate_count) of
+ undefined -> rpc:call(RemoteNode, application, get_env,
+ [rabbit, delegate_count]);
+ Result -> Result
+ end,
Count.
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate() ->
+delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name = delegate_name(
- erlang:phash2(self(), delegate_count())),
+ undefined -> Name =
+ delegate_name(erlang:phash2(
+ self(), delegate_count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 52747221..e0ffa7c8 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -40,7 +40,7 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- DCount = delegate:delegate_count(),
+ DCount = delegate:delegate_count([node()]),
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fb46e3f2..9a938d10 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -145,13 +145,13 @@
{requires, routing_ready},
{enables, networking}]}).
+-rabbit_boot_step({direct_client,
+ [{mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
- {requires, log_relay},
- {enables, networking_listening}]}).
-
--rabbit_boot_step({networking_listening,
- [{description, "network listeners available"}]}).
+ {requires, log_relay}]}).
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ad9e3ce6..a6da551d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -137,7 +137,9 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit()).
+ rabbit_types:connection_exit() |
+ fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
@@ -215,8 +217,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[_] -> %% Q exists on stopped node
rabbit_misc:const(not_found)
end;
- [ExistingQ] ->
- rabbit_misc:const(ExistingQ)
+ [ExistingQ = #amqqueue{pid = QPid}] ->
+ case is_process_alive(QPid) of
+ true -> rabbit_misc:const(ExistingQ);
+ false -> TailFun = internal_delete(QueueName),
+ fun (Tx) -> TailFun(Tx), ExistingQ end
+ end
end
end).
@@ -432,17 +438,15 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] -> internal_delete1(QueueName)
+ [] -> rabbit_misc:const({error, not_found});
+ [_] -> Deletions = internal_delete1(QueueName),
+ fun (Tx) -> ok = rabbit_binding:process_deletions(
+ Deletions, Tx)
+ end
end
- end,
- fun ({error, _} = Err, _Tx) ->
- Err;
- (Deletions, Tx) ->
- ok = rabbit_binding:process_deletions(Deletions, Tx)
end).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0346ec7d..3418c663 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1149,15 +1149,15 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- BQS1 = BQ:handle_pre_hibernate(BQS),
- %% no activity for a while == 0 egress and ingress rates
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS2},
+ backing_queue_state = BQS3},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 7d9dcd20..1ca07018 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -33,6 +33,10 @@
%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
%% apparently, by OpenAMQ.
+%% TODO: once the minimum erlang becomes R13B03, reimplement this
+%% using the binary module - that makes use of BIFs to do binary
+%% matching and will thus be much faster.
+
description() ->
[{name, <<"PLAIN">>},
{description, <<"SASL PLAIN authentication mechanism">>}].
@@ -41,11 +45,32 @@ init(_Sock) ->
[].
handle_response(Response, _State) ->
- %% The '%%"' at the end of the next line is for Emacs
- case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
- [{capture, all_but_first, binary}]) of
- {match, [User, Pass]} ->
+ case extract_user_pass(Response) of
+ {ok, User, Pass} ->
rabbit_access_control:check_user_pass_login(User, Pass);
- _ ->
+ error ->
{protocol_error, "response ~p invalid", [Response]}
end.
+
+extract_user_pass(Response) ->
+ case extract_elem(Response) of
+ {ok, User, Response1} -> case extract_elem(Response1) of
+ {ok, Pass, <<>>} -> {ok, User, Pass};
+ _ -> error
+ end;
+ error -> error
+ end.
+
+extract_elem(<<0:8, Rest/binary>>) ->
+ Count = next_null_pos(Rest),
+ <<Elem:Count/binary, Rest1/binary>> = Rest,
+ {ok, Elem, Rest1};
+extract_elem(_) ->
+ error.
+
+next_null_pos(Bin) ->
+ next_null_pos(Bin, 0).
+
+next_null_pos(<<>>, Count) -> Count;
+next_null_pos(<<0:8, _Rest/binary>>, Count) -> Count;
+next_null_pos(<<_:8, Rest/binary>>, Count) -> next_null_pos(Rest, Count + 1).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index b270927b..96a22dca 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case mnesia:read(rabbit_exchange, XName) of
+ case mnesia:read({rabbit_exchange, XName}) of
[] ->
add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
[X] ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 91559ea6..a82e5eff 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, _Reason},
+handle_info({'DOWN', _MRef, process, QPid, Reason},
State = #ch{unconfirmed = UC}) ->
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
@@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
gb_trees:next(gb_trees:iterator(UC)), QPid,
{[], UC}, State),
erase_queue_stats(QPid),
- noreply(
- queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))).
+ State1 = case Reason of
+ normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
+ _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
+ end,
+ noreply(queue_blocked(QPid, State1)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -505,6 +508,8 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ %% these confirms will be emitted even when a queue dies, but that
+ %% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
@@ -735,16 +740,22 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ limiter_pid = LimiterPid}) ->
+ OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
%% that messages will be requeued in their original
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
- rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), self())
+ rabbit_misc:with_exit_handler(
+ OkFun, fun () ->
+ rabbit_amqqueue:requeue(
+ QPid, lists:reverse(MsgIds), self())
+ end)
end, ok, UAMQ),
+ ok = notify_limiter(LimiterPid, UAMQ),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1249,6 +1260,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+send_nacks([], State) ->
+ State;
+send_nacks(MXs, State) ->
+ MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
+ coalesce_and_send(MsgSeqNos,
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
send_confirms(State = #ch{confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
@@ -1258,28 +1279,32 @@ send_confirms(State = #ch{confirmed = C}) ->
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- send_confirm(MsgSeqNo, WriterPid),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = MsgSeqNo}),
State;
-send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
- SCs = lists:usort(Cs),
+send_confirms(Cs, State) ->
+ coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+ #'basic.ack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
+coalesce_and_send(MsgSeqNos, MkMsgFun,
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case gb_trees:is_empty(UC) of
- true -> lists:last(SCs) + 1;
+ true -> lists:last(SMsgSeqNos) + 1;
false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
- {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
_ -> ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
- multiple = true})
+ WriterPid, MkMsgFun(lists:last(Ms), true))
end,
- [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
+ [ok = rabbit_writer:send_command(
+ WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-send_confirm(SeqNo, WriterPid) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = SeqNo}).
-
terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d426d55d..d21cfdb7 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,9 +31,11 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {rabbit_types:protocol(), rabbit_net:socket(),
+ {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -41,7 +43,7 @@
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
+start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
- {ok, SupPid, {ChannelPid, AState}}.
+ {ok, SupPid, {ChannelPid, AState}};
+start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid,
+ User, VHost, Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl
index 1c2bbb65..dbdc6cd4 100644
--- a/src/tcp_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--module(tcp_client_sup).
+-module(rabbit_client_sup).
-behaviour(supervisor2).
@@ -22,6 +22,21 @@
-export([init/1]).
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Callback) ->
supervisor2:start_link(?MODULE, Callback).
@@ -29,6 +44,5 @@ start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
- [{tcp_client, {M,F,A},
- temporary, infinity, supervisor, [M]}]}}.
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
new file mode 100644
index 00000000..3b8c9fba
--- /dev/null
+++ b/src/rabbit_direct.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_direct).
+
+-export([boot/0, connect/3, start_channel/5]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(boot/0 :: () -> 'ok').
+-spec(connect/3 :: (binary(), binary(), binary()) ->
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
+-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
+ rabbit_types:user(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+boot() ->
+ {ok, _} =
+ supervisor2:start_child(
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+connect(Username, Password, VHost) ->
+ case lists:keymember(rabbit, 1, application:which_applications()) of
+ true ->
+ try rabbit_access_control:user_pass_login(Username, Password) of
+ #user{} = User ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> {ok, {User, rabbit_reader:server_properties()}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end
+ catch
+ exit:#amqp_error{name = access_refused} -> {error, auth_failure}
+ end;
+ false ->
+ {error, broker_not_found_on_node}
+ end.
+
+start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ {ok, ChannelPid}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 529e3e07..e9c356e1 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+ noreply(write_message(CRef, <<>>,
+ State #msstate { dying_clients = DyingClients1 }));
handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case should_mask_action(CRef, Guid, State) of
- {true, _Location} ->
- noreply(State);
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize }} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
- %% message, but as it is being GC'd currently,
- %% we'll have to write a new copy, which will then
- %% be younger, so ignore this write.
- noreply(State);
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State1))
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
- end;
+ noreply(
+ case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ {write, State1} ->
+ write_message(CRef, Guid, Msg, State1);
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, Guid, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written),
+ CTG
+ end, CRef, State1)
+ end);
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
@@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_action({true, not_found}, _Guid, State) ->
+ {ignore, undefined, State};
+write_action({true, #msg_location { file = File }}, _Guid, State) ->
+ {ignore, File, State};
+write_action({false, not_found}, _Guid, State) ->
+ {write, State};
+write_action({Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }},
+ Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ {write, State};
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently we'll have
+ %% to write a new copy, which will then be younger, so
+ %% ignore this write.
+ {ignore, File, State};
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ {confirm, File, State1}
+ end;
+write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
+ Guid, State) ->
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ %% We already know about it, just update counter. Only update
+ %% field otherwise bad interaction with concurrent GC
+ {confirm, File, State}.
+
write_message(CRef, Guid, Msg, State) ->
write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
@@ -943,11 +965,10 @@ write_message(Guid, Msg,
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })).
+ maybe_roll_to_new_file(CurOffset + TotalSize,
+ State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }).
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) ->
gb_sets:singleton(Guid), CTG)
end, CRef, State).
-client_confirm_if_on_disk(CRef, Guid, CurFile,
- State = #msstate { current_file = CurFile }) ->
- record_pending_confirm(CRef, Guid, State);
-client_confirm_if_on_disk(CRef, Guid, _File, State) ->
- update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
- end, CRef, State).
-
client_confirm(CRef, Guids, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTG) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 031d4f18..283d25c7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -115,13 +115,13 @@ boot_ssl() ->
end.
start() ->
- {ok,_} = supervisor:start_child(
+ {ok,_} = supervisor2:start_child(
rabbit_sup,
{rabbit_tcp_client_sup,
- {tcp_client_sup, start_link,
+ {rabbit_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [tcp_client_sup]}),
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
%% inet_parse:address takes care of ip string, like "0.0.0.0"
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 52e0bb9d..475c415e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -940,8 +940,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), User, VHost, Collector}),
+ ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
+ VHost, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 18e2bdad..1a240856 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
{noreply, NState};
handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
- {value, Child} ->
- {ok, NState} = do_restart(RestartType, Reason, Child, State),
+ {value, Child1} ->
+ {ok, NState} = do_restart(RestartType, Reason, Child1, State),
{noreply, NState};
_ ->
{noreply, State}
@@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, _TRef} = timer:apply_after(
trunc(Delay*1000), ?MODULE, delayed_restart,
[self(), {{RestartType, Delay}, Reason, Child}]),
- {ok, NState}
+ {ok, state_del_child(Child, NState)}
end;
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 76be63d0..b4df1fd0 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -59,19 +59,21 @@ start_child() ->
ping_child(SupPid) ->
Ref = make_ref(),
- get_child_pid(SupPid) ! {ping, Ref, self()},
+ with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end),
receive {pong, Ref} -> ok
after 1000 -> timeout
end.
exit_child(SupPid) ->
- true = exit(get_child_pid(SupPid), abnormal),
+ with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end),
ok.
-get_child_pid(SupPid) ->
- [{_Id, ChildPid, worker, [test_sup]}] =
- supervisor2:which_children(SupPid),
- ChildPid.
+with_child_pid(SupPid, Fun) ->
+ case supervisor2:which_children(SupPid) of
+ [{_Id, undefined, worker, [test_sup]}] -> ok;
+ [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid);
+ [] -> ok
+ end.
run_child() ->
receive {ping, Ref, Pid} -> Pid ! {pong, Ref},