diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 15:29:51 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-16 15:29:51 +0000 |
commit | 916f0d7414bdab0ce0b28be5c2f8b61af461b5ba (patch) | |
tree | f4a25cd5cf4634512b1eca1f97f2063da399277c | |
parent | d38fe2887d29b677a15eb6bdad2ea55ce76c5405 (diff) | |
parent | f4e4bdc1bfefd70bf3d11b40ecda8f67727d2424 (diff) | |
download | rabbitmq-server-916f0d7414bdab0ce0b28be5c2f8b61af461b5ba.tar.gz |
Merging bug23263 to default
-rw-r--r-- | src/rabbit_channel.erl | 72 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 11 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 12 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 10 |
4 files changed, 55 insertions, 50 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index da103284..0c12614c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/9, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, ready_for_close/1]). @@ -29,9 +29,9 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, - start_limiter_fun, transaction_id, tx_participants, next_tag, - uncommitted_ack_q, unacked_message_q, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, + limiter_pid, start_limiter_fun, transaction_id, tx_participants, + next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -67,8 +67,8 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/9 :: - (channel_number(), pid(), pid(), rabbit_types:protocol(), +-spec(start_link/10 :: + (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). @@ -96,11 +96,11 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, - CollectorPid, StartLimiterFun) -> +start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun) -> gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun], []). + ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, + VHost, Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -154,8 +154,8 @@ ready_for_close(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, - CollectorPid, StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), @@ -164,6 +164,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, + conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, transaction_id = none, @@ -364,11 +365,12 @@ ok_msg(false, Msg) -> Msg. send_exception(Reason, State = #ch{protocol = Protocol, channel = Channel, writer_pid = WriterPid, - reader_pid = ReaderPid}) -> + reader_pid = ReaderPid, + conn_pid = ConnPid}) -> {CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [ReaderPid, Channel, Reason]), + [ConnPid, Channel, Reason]), %% something bad's happened: rollback_and_notify may not be 'ok' {_Result, State1} = rollback_and_notify(State), case CloseChannel of @@ -650,12 +652,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, - reader_pid = ReaderPid, + conn_pid = ConnPid, next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, QPid, _MsgId, Redelivered, @@ -689,7 +691,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid, + _, State = #ch{conn_pid = ConnPid, limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of @@ -706,7 +708,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( Q, NoAck, self(), LimiterPid, @@ -921,10 +923,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid, + conn_pid = ConnPid, queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of - true -> ReaderPid; + true -> ConnPid; false -> none end, ActualNameBin = case QueueNameBin of @@ -967,13 +969,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, _, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid}) -> + conn_pid = ConnPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), - ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), + ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -981,11 +983,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid}) -> + _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -1017,11 +1019,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid}) -> + _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ReaderPid, + QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -1142,7 +1144,7 @@ handle_consuming_queue_down(MRef, ConsumerTag, binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, - reader_pid = ReaderPid}) -> + conn_pid = ConnPid }) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -1158,7 +1160,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, key = ActualRoutingKey, args = Arguments}, fun (_X, Q = #amqqueue{}) -> - try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) catch exit:Reason -> {error, Reason} end; (_X, #exchange{}) -> @@ -1410,13 +1412,13 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, _) -> self(); -i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; -i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{user = User}) -> User#user.username; -i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; -i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(pid, _) -> self(); +i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{user = User}) -> User#user.username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 8175ad80..65ccca02 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -58,20 +58,21 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, Protocol, User, VHost, - Capabilities, Collector, start_limiter_fun(SupPid)]}, + [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, + User, VHost, Capabilities, Collector, + start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, +start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, Protocol, - User, VHost, Capabilities, Collector, + [Channel, ClientChannelPid, ClientChannelPid, ConnPid, + Protocol, User, VHost, Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index a2693c69..0810c762 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/7]). +-export([boot/0, connect/4, start_channel/8]). -include("rabbit.hrl"). @@ -28,8 +28,8 @@ -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/7 :: - (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), +-spec(start_channel/8 :: + (rabbit_channel:channel_number(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). @@ -69,11 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, - Collector) -> +start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost, + Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, Protocol, User, VHost, + [{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 930923e8..b8c3f4a9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1121,8 +1121,9 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link( - 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>), - <<"/">>, [], self(), fun (_) -> {ok, self()} end), + 1, self(), Writer, self(), rabbit_framing_amqp_0_9_1, + user(<<"user">>), <<"/">>, [], self(), + fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1181,8 +1182,9 @@ test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), {ok, Ch} = rabbit_channel:start_link( - 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>), - <<"/">>, [], self(), fun (_) -> {ok, self()} end), + 1, Me, Writer, Me, rabbit_framing_amqp_0_9_1, + user(<<"guest">>), <<"/">>, [], self(), + fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) |