summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-23 13:06:47 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-23 13:06:47 +0000
commitc3518550efbf99a229ee90f29710f0effac7db6b (patch)
tree848c086f60c3540c43a7981ef05e59fcae7a2b0c
parentd7bfdd0573b7648bf521ab8300be42782f873c56 (diff)
parenta01cf9e7870250ccdd0129c5cbb9b881581f4fc8 (diff)
downloadrabbitmq-server-c3518550efbf99a229ee90f29710f0effac7db6b.tar.gz
merge bug23625 into default
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl50
-rw-r--r--src/rabbit_channel.erl136
-rw-r--r--src/rabbit_tests.erl31
4 files changed, 137 insertions, 106 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8ce1160c..be7c7867 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,7 +34,7 @@
-export([start_mirroring/1, stop_mirroring/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -156,11 +156,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -563,7 +563,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -573,8 +573,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -594,7 +593,7 @@ stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -607,10 +606,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3c44a50..a3e4ca31 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -101,16 +101,14 @@
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]).
%%----------------------------------------------------------------------------
@@ -183,7 +181,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -277,7 +275,8 @@ terminate_shutdown(Fun, State) ->
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -605,9 +604,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -648,7 +647,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -656,7 +656,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
@@ -955,19 +956,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -1076,9 +1077,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1087,7 +1087,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1102,7 +1102,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1113,7 +1113,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1123,7 +1123,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(From, ok, State1)
@@ -1174,11 +1174,13 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b97af6d8..b1ef3b6b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,9 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) ->
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> incr_stats([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
rabbit_trace:tap_trace_out(Msg, TraceState),
@@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
+ack(Acked, State = #ch{queue_names = QNames}) ->
Incs = fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count} | L];
+ error -> L
+ end
end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
incr_stats(Incs, ack, State).
@@ -1339,23 +1353,42 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
+ incr_stats([{exchange_stats, XName, 1}], return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1489,24 +1522,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 176374ce..03500d71 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1286,8 +1286,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1311,9 +1310,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2562,7 +2561,7 @@ test_queue_recover() ->
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.