summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-14 18:10:44 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-14 18:10:44 +0000
commit88651d6fc3d3f6bfcd9a94bceec465d9a5271865 (patch)
tree42e01c371f0c83b5ee8d93f0ebf7031d81a46880
parentab98fba181a2c34d81958bee8be72535bc2b7321 (diff)
downloadrabbitmq-server-88651d6fc3d3f6bfcd9a94bceec465d9a5271865.tar.gz
Pass client capabilities through to the channel
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_channel.erl25
-rw-r--r--src/rabbit_channel_sup.erl25
-rw-r--r--src/rabbit_direct.erl14
-rw-r--r--src/rabbit_reader.erl21
-rw-r--r--src/rabbit_tests.erl4
6 files changed, 53 insertions, 38 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 15f5d7c5..24d0f961 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -28,7 +28,7 @@
-record(vhost, {virtual_host, dummy}).
-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties}).
+ client_properties, capabilities}).
-record(content,
{class_id,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff..d6396c99 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/8, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -34,7 +34,8 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ confirm_enabled, publish_seqno, unconfirmed, confirmed,
+ capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,9 +67,9 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
+-spec(start_link/8 ::
+ (rabbit_framing:amqp_table(), channel_number(), pid(), pid(),
+ rabbit_types:user(), rabbit_types:vhost(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -94,10 +95,11 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Capabilities, Channel, ReaderPid, WriterPid, User, VHost,
+ CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(?MODULE,
+ [Capabilities, Channel, ReaderPid, WriterPid, User,
+ VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -148,7 +150,7 @@ emit_stats(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
+init([Capabilities, Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -174,7 +176,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
- confirmed = []},
+ confirmed = [],
+ capabilities = Capabilities},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d21cfdb7..6dc0eba4 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,11 +31,12 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
- rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()} |
- {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid()}).
+ {'tcp', rabbit_types:protocol(), rabbit_framing:amqp_table(),
+ rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), pid(), rabbit_types:user(), rabbit_types:vhost(),
+ pid()} |
+ {'direct', rabbit_framing:amqp_table(), rabbit_channel:channel_number(),
+ pid(), rabbit_types:user(), rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -43,8 +44,8 @@
%%----------------------------------------------------------------------------
-start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+start_link({tcp, Protocol, Capabilities, Sock, Channel, FrameMax, ReaderPid,
+ User, VHost, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -56,19 +57,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, User, VHost,
+ [Capabilities, Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+start_link({direct, Capabilities, Channel, ClientChannelPid, User, VHost,
+ Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- User, VHost, Collector, start_limiter_fun(SupPid)]},
+ [Capabilities, Channel, ClientChannelPid,
+ ClientChannelPid, User, VHost, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 3b8c9fba..34f868cb 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/3, start_channel/5]).
+-export([boot/0, connect/3, start_channel/6]).
-include("rabbit.hrl").
@@ -28,9 +28,10 @@
-spec(connect/3 :: (binary(), binary(), binary()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()) ->
- {'ok', pid()}).
+-spec(start_channel/6 ::
+ (rabbit_framing:amqp_table(), rabbit_channel:channel_number(), pid(),
+ rabbit_types:user(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid()}).
-endif.
@@ -67,9 +68,10 @@ connect(Username, Password, VHost) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+start_channel(Capabilities, Number, ClientChannelPid, User, VHost, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ [{direct, Capabilities, Number, ClientChannelPid, User, VHost,
+ Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1781469a..7753bb50 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -709,12 +709,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism),
+ Capabilities =
+ case lists:keyfind(<<"capabilities">>, 1, ClientProperties) of
+ {<<"capabilities">>, table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
State = State0#v1{auth_mechanism = AuthMechanism,
auth_state = AuthMechanism:init(Sock),
connection_state = securing,
connection =
Connection#connection{
- client_properties = ClientProperties}},
+ client_properties = ClientProperties,
+ capabilities = Capabilities}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -947,14 +953,15 @@ cert_info(F, Sock) ->
send_to_new_channel(Channel, AnalyzedFrame, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost}} = State,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
- VHost, Collector}),
+ ChanSupSup, {tcp, Protocol, Capabilities, Sock, Channel, FrameMax,
+ self(), User, VHost, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 58c369b5..d6c85af1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1019,7 +1019,7 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
+ {ok, Ch} = rabbit_channel:start_link([], 1, self(), Writer,
user(<<"user">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
@@ -1079,7 +1079,7 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
+ {ok, Ch} = rabbit_channel:start_link([], 1, Me, Writer,
user(<<"guest">>), <<"/">>, self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),