diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-14 18:10:44 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-14 18:10:44 +0000 |
commit | 88651d6fc3d3f6bfcd9a94bceec465d9a5271865 (patch) | |
tree | 42e01c371f0c83b5ee8d93f0ebf7031d81a46880 | |
parent | ab98fba181a2c34d81958bee8be72535bc2b7321 (diff) | |
download | rabbitmq-server-88651d6fc3d3f6bfcd9a94bceec465d9a5271865.tar.gz |
Pass client capabilities through to the channel
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 25 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 25 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 14 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 21 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 |
6 files changed, 53 insertions, 38 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 15f5d7c5..24d0f961 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -28,7 +28,7 @@ -record(vhost, {virtual_host, dummy}). -record(connection, {protocol, user, timeout_sec, frame_max, vhost, - client_properties}). + client_properties, capabilities}). -record(content, {class_id, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a82e5eff..d6396c99 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/8, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -34,7 +34,8 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed}). + confirm_enabled, publish_seqno, unconfirmed, confirmed, + capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -66,9 +67,9 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/7 :: - (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid(), +-spec(start_link/8 :: + (rabbit_framing:amqp_table(), channel_number(), pid(), pid(), + rabbit_types:user(), rabbit_types:vhost(), pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -94,10 +95,11 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, - StartLimiterFun) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User, - VHost, CollectorPid, StartLimiterFun], []). +start_link(Capabilities, Channel, ReaderPid, WriterPid, User, VHost, + CollectorPid, StartLimiterFun) -> + gen_server2:start_link(?MODULE, + [Capabilities, Channel, ReaderPid, WriterPid, User, + VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -148,7 +150,7 @@ emit_stats(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, +init([Capabilities, Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), @@ -174,7 +176,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, confirm_enabled = false, publish_seqno = 1, unconfirmed = gb_trees:empty(), - confirmed = []}, + confirmed = [], + capabilities = Capabilities}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d21cfdb7..6dc0eba4 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,11 +31,12 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {'tcp', rabbit_types:protocol(), rabbit_net:socket(), - rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()} | - {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid()}). + {'tcp', rabbit_types:protocol(), rabbit_framing:amqp_table(), + rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_types:user(), rabbit_types:vhost(), + pid()} | + {'direct', rabbit_framing:amqp_table(), rabbit_channel:channel_number(), + pid(), rabbit_types:user(), rabbit_types:vhost(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -43,8 +44,8 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, - Collector}) -> +start_link({tcp, Protocol, Capabilities, Sock, Channel, FrameMax, ReaderPid, + User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( @@ -56,19 +57,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, User, VHost, + [Capabilities, Channel, ReaderPid, WriterPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> +start_link({direct, Capabilities, Channel, ClientChannelPid, User, VHost, + Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, - User, VHost, Collector, start_limiter_fun(SupPid)]}, + [Capabilities, Channel, ClientChannelPid, + ClientChannelPid, User, VHost, Collector, + start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 3b8c9fba..34f868cb 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/3, start_channel/5]). +-export([boot/0, connect/3, start_channel/6]). -include("rabbit.hrl"). @@ -28,9 +28,10 @@ -spec(connect/3 :: (binary(), binary(), binary()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()) -> - {'ok', pid()}). +-spec(start_channel/6 :: + (rabbit_framing:amqp_table(), rabbit_channel:channel_number(), pid(), + rabbit_types:user(), rabbit_types:vhost(), pid()) -> + {'ok', pid()}). -endif. @@ -67,9 +68,10 @@ connect(Username, Password, VHost) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, User, VHost, Collector) -> +start_channel(Capabilities, Number, ClientChannelPid, User, VHost, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + [{direct, Capabilities, Number, ClientChannelPid, User, VHost, + Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1781469a..7753bb50 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -709,12 +709,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, connection = Connection, sock = Sock}) -> AuthMechanism = auth_mechanism_to_module(Mechanism), + Capabilities = + case lists:keyfind(<<"capabilities">>, 1, ClientProperties) of + {<<"capabilities">>, table, Capabilities1} -> Capabilities1; + _ -> [] + end, State = State0#v1{auth_mechanism = AuthMechanism, auth_state = AuthMechanism:init(Sock), connection_state = securing, connection = Connection#connection{ - client_properties = ClientProperties}}, + client_properties = ClientProperties, + capabilities = Capabilities}}, auth_phase(Response, State); handle_method0(#'connection.secure_ok'{response = Response}, @@ -947,14 +953,15 @@ cert_info(F, Sock) -> send_to_new_channel(Channel, AnalyzedFrame, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost}} = State, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, - VHost, Collector}), + ChanSupSup, {tcp, Protocol, Capabilities, Sock, Channel, FrameMax, + self(), User, VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 58c369b5..d6c85af1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1019,7 +1019,7 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, + {ok, Ch} = rabbit_channel:start_link([], 1, self(), Writer, user(<<"user">>), <<"/">>, self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], @@ -1079,7 +1079,7 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, + {ok, Ch} = rabbit_channel:start_link([], 1, Me, Writer, user(<<"guest">>), <<"/">>, self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), |