summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Alexandru Ionescu <vlad@rabbitmq.com>2011-01-19 21:27:03 +0000
committerVlad Alexandru Ionescu <vlad@rabbitmq.com>2011-01-19 21:27:03 +0000
commitf8ddf02f3d43445ba57ed290ff5b9f1df2b499b4 (patch)
treeb775bd82609a2f2774c85d138d08d76e752803e9
parenta3ff54bc80fbe7cef61edc4a53142204f99f8f38 (diff)
parentf15f44d12f8f0c0f9bd9bfa899bd29aca2ed4692 (diff)
downloadrabbitmq-server-f8ddf02f3d43445ba57ed290ff5b9f1df2b499b4.tar.gz
merging in from default
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_channel_sup.erl21
-rw-r--r--src/rabbit_channel_sup_sup.erl11
-rw-r--r--src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl)7
-rw-r--r--src/rabbit_direct.erl52
-rw-r--r--src/rabbit_networking.erl6
-rw-r--r--src/rabbit_reader.erl4
8 files changed, 92 insertions, 20 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 5ed872b6..bb0fbcd3 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -8,7 +8,8 @@
rabbit_node_monitor,
rabbit_router,
rabbit_sup,
- rabbit_tcp_client_sup]},
+ rabbit_tcp_client_sup,
+ rabbit_direct_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b041a637..9de6e794 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -145,11 +145,19 @@
{requires, routing_ready},
{enables, networking}]}).
+-rabbit_boot_step({direct_client,
+ [{mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay},
+ {enables, direct_listening}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
{requires, log_relay},
{enables, networking_listening}]}).
+-rabbit_boot_step({direct_listening,
+ [{description, "direct connection listeners available"}]}).
+
-rabbit_boot_step({networking_listening,
[{description, "network listeners available"}]}).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d426d55d..3b00181b 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start_link/1]).
+-export([start_link/2]).
-export([init/1]).
@@ -35,14 +35,15 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_types:user(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
+-spec(start_link/2 :: (atom(), start_link_args()) ->
+ {'ok', pid(), {pid(), any()}}).
-endif.
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+start_link(tcp, {Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
+ Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -58,7 +59,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
- {ok, SupPid, {ChannelPid, AState}}.
+ {ok, SupPid, {ChannelPid, AState}};
+start_link(direct, {Channel, ClientChannelPid, User, VHost, Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid,
+ User, VHost, Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index e2561c80..9240ee37 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start_link/0, start_channel/2]).
+-export([start_link/0, start_channel/3]).
-export([init/1]).
@@ -27,8 +27,9 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), {pid(), any()}}).
+-spec(start_channel/3 :: (pid(), atom(),
+ rabbit_channel_sup:start_link_args()) ->
+ {'ok', pid(), {pid(), any()}}).
-endif.
@@ -37,8 +38,8 @@
start_link() ->
supervisor2:start_link(?MODULE, []).
-start_channel(Pid, Args) ->
- supervisor2:start_child(Pid, [Args]).
+start_channel(Pid, Type, Args) ->
+ supervisor2:start_child(Pid, [Type, Args]).
%%----------------------------------------------------------------------------
diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl
index 1c2bbb65..336a8cd7 100644
--- a/src/tcp_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--module(tcp_client_sup).
+-module(rabbit_client_sup).
-behaviour(supervisor2).
@@ -29,6 +29,5 @@ start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
- [{tcp_client, {M,F,A},
- temporary, infinity, supervisor, [M]}]}}.
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
new file mode 100644
index 00000000..b1c5f415
--- /dev/null
+++ b/src/rabbit_direct.erl
@@ -0,0 +1,52 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_direct).
+
+-export([boot/0, start_channel/1]).
+
+%%----------------------------------------------------------------------------
+
+boot() ->
+ {ok, _} =
+ supervisor2:start_child(
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, [direct]}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
+ ok.
+
+start_channel(Args) ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(rabbit_direct_client_sup, [Args]),
+ {ok, ChannelPid}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9788c922..0a7d9dd7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -108,13 +108,13 @@ boot_ssl() ->
end.
start() ->
- {ok,_} = supervisor:start_child(
+ {ok,_} = supervisor2:start_child(
rabbit_sup,
{rabbit_tcp_client_sup,
- {tcp_client_sup, start_link,
+ {rabbit_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [tcp_client_sup]}),
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
getaddr(Host) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 696dc265..c2245a13 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -940,8 +940,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), User, VHost, Collector}),
+ ChanSupSup, tcp, {Protocol, Sock, Channel, FrameMax,
+ self(), User, VHost, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),