summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-17 12:21:48 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-17 12:21:48 +0000
commitf3139c0e04b8056f8cc152aa9795ad522c90e882 (patch)
tree989c6895e6cd56714f274e335151d6163eaaeeaf
parent9c35ddb67e49f4f2c146bc3d9ba30e18a4f67345 (diff)
parentcba50e22e19538edc112adb6d1599a357a7bb703 (diff)
downloadrabbitmq-server-f3139c0e04b8056f8cc152aa9795ad522c90e882.tar.gz
Merge bug23839 (Add capabilities to client connection)
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--src/rabbit_channel_sup.erl17
-rw-r--r--src/rabbit_direct.erl14
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_tests.erl6
6 files changed, 48 insertions, 33 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 15f5d7c5..24d0f961 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -28,7 +28,7 @@
-record(vhost, {virtual_host, dummy}).
-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties}).
+ client_properties, capabilities}).
-record(content,
{class_id,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff..a6790b6c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/8, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -34,7 +34,8 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ confirm_enabled, publish_seqno, unconfirmed, confirmed,
+ capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,9 +67,9 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
+-spec(start_link/8 ::
(channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -94,10 +95,11 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(?MODULE,
+ [Channel, ReaderPid, WriterPid, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -148,7 +150,7 @@ emit_stats(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
+init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -174,7 +176,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
- confirmed = []},
+ confirmed = [],
+ capabilities = Capabilities},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d21cfdb7..90058194 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -33,9 +33,10 @@
-type(start_link_args() ::
{'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()} |
{'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid()}).
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -44,7 +45,7 @@
%%----------------------------------------------------------------------------
start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -56,19 +57,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, User, VHost,
+ [Channel, ReaderPid, WriterPid, User, VHost, Capabilities,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities,
+ Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- User, VHost, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid, User,
+ VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index bd41a8b9..5c89bf49 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/5]).
+-export([boot/0, connect/4, start_channel/6]).
-include("rabbit.hrl").
@@ -28,9 +28,10 @@
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()) ->
- {'ok', pid()}).
+-spec(start_channel/6 ::
+ (rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) ->
+ {'ok', pid()}).
-endif.
@@ -68,9 +69,10 @@ connect(Username, Password, VHost, Protocol) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ [{direct, Number, ClientChannelPid, User, VHost, Capabilities,
+ Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e8225316..e9ff97f9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -718,12 +718,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
State = State0#v1{auth_mechanism = AuthMechanism,
auth_state = AuthMechanism:init(Sock),
connection_state = securing,
connection =
Connection#connection{
- client_properties = ClientProperties}},
+ client_properties = ClientProperties,
+ capabilities = Capabilities}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -956,14 +962,15 @@ cert_info(F, Sock) ->
send_to_new_channel(Channel, AnalyzedFrame, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost}} = State,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
- VHost, Collector}),
+ VHost, Capabilities, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bc9a84c8..59862821 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1020,7 +1020,7 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- user(<<"user">>), <<"/">>, self(),
+ user(<<"user">>), <<"/">>, [], self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1079,8 +1079,8 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- user(<<"guest">>), <<"/">>, self(),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>),
+ <<"/">>, [], self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok