summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-16 15:29:51 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-16 15:29:51 +0000
commit916f0d7414bdab0ce0b28be5c2f8b61af461b5ba (patch)
treef4a25cd5cf4634512b1eca1f97f2063da399277c
parentd38fe2887d29b677a15eb6bdad2ea55ce76c5405 (diff)
parentf4e4bdc1bfefd70bf3d11b40ecda8f67727d2424 (diff)
downloadrabbitmq-server-916f0d7414bdab0ce0b28be5c2f8b61af461b5ba.tar.gz
Merging bug23263 to default
-rw-r--r--src/rabbit_channel.erl72
-rw-r--r--src/rabbit_channel_sup.erl11
-rw-r--r--src/rabbit_direct.erl12
-rw-r--r--src/rabbit_tests.erl10
4 files changed, 55 insertions, 50 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index da103284..0c12614c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/9, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, ready_for_close/1]).
@@ -29,9 +29,9 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2]).
--record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid,
- start_limiter_fun, transaction_id, tx_participants, next_tag,
- uncommitted_ack_q, unacked_message_q,
+-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
+ limiter_pid, start_limiter_fun, transaction_id, tx_participants,
+ next_tag, uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -67,8 +67,8 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/9 ::
- (channel_number(), pid(), pid(), rabbit_types:protocol(),
+-spec(start_link/10 ::
+ (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
@@ -96,11 +96,11 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
- CollectorPid, StartLimiterFun) ->
+start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun) ->
gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
- Capabilities, CollectorPid, StartLimiterFun], []).
+ ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
+ VHost, Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -154,8 +154,8 @@ ready_for_close(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
- CollectorPid, StartLimiterFun]) ->
+init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
@@ -164,6 +164,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
+ conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
transaction_id = none,
@@ -364,11 +365,12 @@ ok_msg(false, Msg) -> Msg.
send_exception(Reason, State = #ch{protocol = Protocol,
channel = Channel,
writer_pid = WriterPid,
- reader_pid = ReaderPid}) ->
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
{CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ReaderPid, Channel, Reason]),
+ [ConnPid, Channel, Reason]),
%% something bad's happened: rollback_and_notify may not be 'ok'
{_Result, State1} = rollback_and_notify(State),
case CloseChannel of
@@ -650,12 +652,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
- reader_pid = ReaderPid,
+ conn_pid = ConnPid,
next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
@@ -689,7 +691,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
+ _, State = #ch{conn_pid = ConnPid,
limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
@@ -706,7 +708,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(), LimiterPid,
@@ -921,10 +923,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid,
+ conn_pid = ConnPid,
queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
- true -> ReaderPid;
+ true -> ConnPid;
false -> none
end,
ActualNameBin = case QueueNameBin of
@@ -967,13 +969,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ conn_pid = ConnPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -981,11 +983,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid}) ->
+ _, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -1017,11 +1019,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid}) ->
+ _, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -1142,7 +1144,7 @@ handle_consuming_queue_down(MRef, ConsumerTag,
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ conn_pid = ConnPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -1158,7 +1160,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
key = ActualRoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
- try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
catch exit:Reason -> {error, Reason}
end;
(_X, #exchange{}) ->
@@ -1410,13 +1412,13 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, _) -> self();
-i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
-i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(pid, _) -> self();
+i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{user = User}) -> User#user.username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 8175ad80..65ccca02 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -58,20 +58,21 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
- Capabilities, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ReaderPid, WriterPid, ReaderPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
+start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid, Protocol,
- User, VHost, Capabilities, Collector,
+ [Channel, ClientChannelPid, ClientChannelPid, ConnPid,
+ Protocol, User, VHost, Capabilities, Collector,
start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a2693c69..0810c762 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/7]).
+-export([boot/0, connect/4, start_channel/8]).
-include("rabbit.hrl").
@@ -28,8 +28,8 @@
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/7 ::
- (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
+-spec(start_channel/8 ::
+ (rabbit_channel:channel_number(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid()) -> {'ok', pid()}).
@@ -69,11 +69,11 @@ connect(Username, Password, VHost, Protocol) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
- Collector) ->
+start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
+ Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, Protocol, User, VHost,
+ [{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 930923e8..b8c3f4a9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1121,8 +1121,9 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(
- 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>),
- <<"/">>, [], self(), fun (_) -> {ok, self()} end),
+ 1, self(), Writer, self(), rabbit_framing_amqp_0_9_1,
+ user(<<"user">>), <<"/">>, [], self(),
+ fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1181,8 +1182,9 @@ test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
{ok, Ch} = rabbit_channel:start_link(
- 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>),
- <<"/">>, [], self(), fun (_) -> {ok, self()} end),
+ 1, Me, Writer, Me, rabbit_framing_amqp_0_9_1,
+ user(<<"guest">>), <<"/">>, [], self(),
+ fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)