summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-28 17:08:31 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-28 17:08:31 +0000
commit8a9c48c119704a6be63acdc03c805d20bd84286c (patch)
treedd44624fec468ae51df37914870872cb4d3d53c7
parentf253de6c4c6fe2c9460835f475f963479a4fba4d (diff)
parent2924a63b5dace9e3ae1c8f6bdee6188f4f1ff33c (diff)
downloadrabbitmq-server-8a9c48c119704a6be63acdc03c805d20bd84286c.tar.gz
merging bug23696 to default
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--src/delegate.erl27
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl)22
-rw-r--r--src/rabbit_direct.erl75
-rw-r--r--src/rabbit_networking.erl6
-rw-r--r--src/rabbit_reader.erl4
9 files changed, 138 insertions, 31 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index d3e9d84b..cc7221d6 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -8,7 +8,8 @@
rabbit_node_monitor,
rabbit_router,
rabbit_sup,
- rabbit_tcp_client_sup]},
+ rabbit_tcp_client_sup,
+ rabbit_direct_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
diff --git a/src/delegate.erl b/src/delegate.erl
index ff55a15b..46bd8245 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,7 +36,7 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/0 :: () -> non_neg_integer()).
+-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-endif.
@@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) ->
{Replies, BadNodes} =
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
- {invoke, Fun, Grouped},
- infinity)
+ RemoteNodes -> gen_server2:multi_call(
+ RemoteNodes, delegate(RemoteNodes),
+ {invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
@@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
@@ -111,17 +111,22 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count() ->
- {ok, Count} = application:get_env(rabbit, delegate_count),
+delegate_count([RemoteNode | _]) ->
+ {ok, Count} = case application:get_env(rabbit, delegate_count) of
+ undefined -> rpc:call(RemoteNode, application, get_env,
+ [rabbit, delegate_count]);
+ Result -> Result
+ end,
Count.
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate() ->
+delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name = delegate_name(
- erlang:phash2(self(), delegate_count())),
+ undefined -> Name =
+ delegate_name(erlang:phash2(
+ self(), delegate_count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 52747221..e0ffa7c8 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -40,7 +40,7 @@ start_link() ->
%%----------------------------------------------------------------------------
init(_Args) ->
- DCount = delegate:delegate_count(),
+ DCount = delegate:delegate_count([node()]),
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b041a637..c6661d39 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -145,13 +145,13 @@
{requires, routing_ready},
{enables, networking}]}).
+-rabbit_boot_step({direct_client,
+ [{mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
- {requires, log_relay},
- {enables, networking_listening}]}).
-
--rabbit_boot_step({networking_listening,
- [{description, "network listeners available"}]}).
+ {requires, log_relay}]}).
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d426d55d..d21cfdb7 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,9 +31,11 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {rabbit_types:protocol(), rabbit_net:socket(),
+ {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -41,7 +43,7 @@
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
+start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
- {ok, SupPid, {ChannelPid, AState}}.
+ {ok, SupPid, {ChannelPid, AState}};
+start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid,
+ User, VHost, Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl
index 1c2bbb65..dbdc6cd4 100644
--- a/src/tcp_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--module(tcp_client_sup).
+-module(rabbit_client_sup).
-behaviour(supervisor2).
@@ -22,6 +22,21 @@
-export([init/1]).
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Callback) ->
supervisor2:start_link(?MODULE, Callback).
@@ -29,6 +44,5 @@ start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
- [{tcp_client, {M,F,A},
- temporary, infinity, supervisor, [M]}]}}.
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
new file mode 100644
index 00000000..3b8c9fba
--- /dev/null
+++ b/src/rabbit_direct.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_direct).
+
+-export([boot/0, connect/3, start_channel/5]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(boot/0 :: () -> 'ok').
+-spec(connect/3 :: (binary(), binary(), binary()) ->
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
+-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
+ rabbit_types:user(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+boot() ->
+ {ok, _} =
+ supervisor2:start_child(
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+connect(Username, Password, VHost) ->
+ case lists:keymember(rabbit, 1, application:which_applications()) of
+ true ->
+ try rabbit_access_control:user_pass_login(Username, Password) of
+ #user{} = User ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> {ok, {User, rabbit_reader:server_properties()}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end
+ catch
+ exit:#amqp_error{name = access_refused} -> {error, auth_failure}
+ end;
+ false ->
+ {error, broker_not_found_on_node}
+ end.
+
+start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ {ok, ChannelPid}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 031d4f18..283d25c7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -115,13 +115,13 @@ boot_ssl() ->
end.
start() ->
- {ok,_} = supervisor:start_child(
+ {ok,_} = supervisor2:start_child(
rabbit_sup,
{rabbit_tcp_client_sup,
- {tcp_client_sup, start_link,
+ {rabbit_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [tcp_client_sup]}),
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
%% inet_parse:address takes care of ip string, like "0.0.0.0"
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 52e0bb9d..475c415e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -940,8 +940,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), User, VHost, Collector}),
+ ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
+ VHost, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),