diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-28 17:08:31 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-28 17:08:31 +0000 |
commit | 8a9c48c119704a6be63acdc03c805d20bd84286c (patch) | |
tree | dd44624fec468ae51df37914870872cb4d3d53c7 | |
parent | f253de6c4c6fe2c9460835f475f963479a4fba4d (diff) | |
parent | 2924a63b5dace9e3ae1c8f6bdee6188f4f1ff33c (diff) | |
download | rabbitmq-server-8a9c48c119704a6be63acdc03c805d20bd84286c.tar.gz |
merging bug23696 to default
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | src/delegate.erl | 27 | ||||
-rw-r--r-- | src/delegate_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl) | 22 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 75 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 4 |
9 files changed, 138 insertions, 31 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index d3e9d84b..cc7221d6 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -8,7 +8,8 @@ rabbit_node_monitor, rabbit_router, rabbit_sup, - rabbit_tcp_client_sup]}, + rabbit_tcp_client_sup, + rabbit_direct_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it diff --git a/src/delegate.erl b/src/delegate.erl index ff55a15b..46bd8245 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +36,7 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()). -endif. @@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) -> {Replies, BadNodes} = case orddict:fetch_keys(Grouped) of [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), - {invoke, Fun, Grouped}, - infinity) + RemoteNodes -> gen_server2:multi_call( + RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, @@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die @@ -111,17 +111,22 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count() -> - {ok, Count} = application:get_env(rabbit, delegate_count), +delegate_count([RemoteNode | _]) -> + {ok, Count} = case application:get_env(rabbit, delegate_count) of + undefined -> rpc:call(RemoteNode, application, get_env, + [rabbit, delegate_count]); + Result -> Result + end, Count. delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate() -> +delegate(RemoteNodes) -> case get(delegate) of - undefined -> Name = delegate_name( - erlang:phash2(self(), delegate_count())), + undefined -> Name = + delegate_name(erlang:phash2( + self(), delegate_count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 52747221..e0ffa7c8 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -40,7 +40,7 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - DCount = delegate:delegate_count(), + DCount = delegate:delegate_count([node()]), {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || diff --git a/src/rabbit.erl b/src/rabbit.erl index b041a637..c6661d39 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -145,13 +145,13 @@ {requires, routing_ready}, {enables, networking}]}). +-rabbit_boot_step({direct_client, + [{mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, - {requires, log_relay}, - {enables, networking_listening}]}). - --rabbit_boot_step({networking_listening, - [{description, "network listeners available"}]}). + {requires, log_relay}]}). %%--------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d426d55d..d21cfdb7 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,9 +31,11 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {rabbit_types:protocol(), rabbit_net:socket(), + {'tcp', rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()}). + rabbit_types:user(), rabbit_types:vhost(), pid()} | + {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), + rabbit_types:vhost(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -41,7 +43,7 @@ %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), - {ok, SupPid, {ChannelPid, AState}}. + {ok, SupPid, {ChannelPid, AState}}; +start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, + User, VHost, Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl index 1c2bbb65..dbdc6cd4 100644 --- a/src/tcp_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --module(tcp_client_sup). +-module(rabbit_client_sup). -behaviour(supervisor2). @@ -22,6 +22,21 @@ -export([init/1]). +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (mfa()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, mfa()) -> + rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Callback) -> supervisor2:start_link(?MODULE, Callback). @@ -29,6 +44,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, - [{tcp_client, {M,F,A}, - temporary, infinity, supervisor, [M]}]}}. + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl new file mode 100644 index 00000000..3b8c9fba --- /dev/null +++ b/src/rabbit_direct.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_direct). + +-export([boot/0, connect/3, start_channel/5]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(boot/0 :: () -> 'ok'). +-spec(connect/3 :: (binary(), binary(), binary()) -> + {'ok', {rabbit_types:user(), + rabbit_framing:amqp_table()}}). +-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), + rabbit_types:user(), rabbit_types:vhost(), pid()) -> + {'ok', pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +boot() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), + ok. + +%%---------------------------------------------------------------------------- + +connect(Username, Password, VHost) -> + case lists:keymember(rabbit, 1, application:which_applications()) of + true -> + try rabbit_access_control:user_pass_login(Username, Password) of + #user{} = User -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> {ok, {User, rabbit_reader:server_properties()}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end + catch + exit:#amqp_error{name = access_refused} -> {error, auth_failure} + end; + false -> + {error, broker_not_found_on_node} + end. + +start_channel(Number, ClientChannelPid, User, VHost, Collector) -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child( + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + {ok, ChannelPid}. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 031d4f18..283d25c7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -115,13 +115,13 @@ boot_ssl() -> end. start() -> - {ok,_} = supervisor:start_child( + {ok,_} = supervisor2:start_child( rabbit_sup, {rabbit_tcp_client_sup, - {tcp_client_sup, start_link, + {rabbit_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [tcp_client_sup]}), + transient, infinity, supervisor, [rabbit_client_sup]}), ok. %% inet_parse:address takes care of ip string, like "0.0.0.0" diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 52e0bb9d..475c415e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -940,8 +940,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), User, VHost, Collector}), + ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), |