summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-27 18:23:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-27 18:23:13 +0000
commitba3741561ccefd4e9d16a75d9edd11085b1e7161 (patch)
tree2e71b47f08f323225e61e1926abda8a98c10bf6d
parent9e8f4fc04ed36a6d7243cf050ce590e14375e15e (diff)
parent6f965d3ddfce609f3da3e275840d2d0351b66baf (diff)
downloadrabbitmq-server-ba3741561ccefd4e9d16a75d9edd11085b1e7161.tar.gz
re-merge bug25303 into default
-rw-r--r--README2
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl119
-rw-r--r--src/rabbit_backing_queue.erl14
-rw-r--r--src/rabbit_backing_queue_qc.erl8
-rw-r--r--src/rabbit_channel.erl136
-rw-r--r--src/rabbit_mirror_queue_master.erl62
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl6
-rw-r--r--src/rabbit_policy.erl4
-rw-r--r--src/rabbit_tests.erl56
-rw-r--r--src/rabbit_variable_queue.erl79
14 files changed, 269 insertions, 274 deletions
diff --git a/README b/README
index 90e99e62..67e3a66a 100644
--- a/README
+++ b/README
@@ -1 +1 @@
-Please see http://www.rabbitmq.com/build-server.html for build instructions. \ No newline at end of file
+Please see http://www.rabbitmq.com/build-server.html for build instructions.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b2832b45..0ccb80bf 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -78,8 +78,7 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false,
- delivered = false}).
+-record(message_properties, {expiry, needs_confirming = false}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c3a6d283..7b8348fc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -400,13 +400,11 @@ status() ->
is_running() -> is_running(node()).
-is_running(Node) ->
- rabbit_nodes:is_running(Node, rabbit).
+is_running(Node) -> rabbit_nodes:is_running(Node, rabbit).
environment() ->
- lists:keysort(
- 1, [P || P = {K, _} <- application:get_all_env(rabbit),
- K =/= default_pass]).
+ lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit),
+ K =/= default_pass]).
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8ce1160c..7827b839 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,7 +34,7 @@
-export([start_mirroring/1, stop_mirroring/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -156,11 +156,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -563,7 +563,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -573,8 +573,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -588,13 +587,13 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -607,10 +606,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3c44a50..74717ace 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -101,16 +101,14 @@
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -122,11 +120,10 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
-
State = #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
+ backing_queue = undefined,
backing_queue_state = undefined,
active_consumers = queue:new(),
expires = undefined,
@@ -183,7 +180,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -193,7 +190,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From, State = #q{q = Q,
- backing_queue = BQ,
+ backing_queue = undefined,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
#amqqueue{} = Q1 ->
@@ -205,9 +202,11 @@ declare(Recover, From, State = #q{q = Q,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
+ BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
recovery_barrier(Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
+ State1 = process_args(State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -277,7 +276,8 @@ terminate_shutdown(Fun, State) ->
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -485,11 +485,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
- fetch(AckRequired, State),
+ {Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -542,8 +541,8 @@ run_message_queue(State) ->
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -565,15 +564,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -605,9 +604,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -648,7 +647,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -656,7 +656,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
@@ -704,10 +705,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -955,19 +955,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -1061,8 +1061,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case fetch(AckRequired, drop_expired_messages(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
ChAckTags1 = sets:add_element(AckTag, ChAckTags),
@@ -1071,14 +1071,13 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1087,7 +1086,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1102,7 +1101,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1113,7 +1112,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1123,7 +1122,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(From, ok, State1)
@@ -1154,31 +1153,16 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
-handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- reply(ok, State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- reply(ok, State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
@@ -1300,6 +1284,23 @@ handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;
+handle_cast(start_mirroring, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ noreply(State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
+handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ noreply(State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index ffa716b6..e2945e1d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,13 +26,9 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
- ('empty' |
- %% MessageId, AckTag, Remaining_Len
- {rabbit_types:msg_id(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
@@ -82,8 +78,8 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state().
+ rabbit_types:message_properties(), boolean(), pid(),
+ state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -224,7 +220,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 764911b9..ed8f797a 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -286,12 +286,12 @@ next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -299,14 +299,14 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgIdFetched, AckTag, RemainingLen}, _BQ} ->
+ {{MsgIdFetched, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgId = eval({call, erlang, element,
[?RECORD_INDEX(id, basic_message), Msg]}),
MsgIdFetched =:= MsgId andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b97af6d8..b1ef3b6b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,9 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -334,9 +336,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -677,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -689,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -728,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1126,9 +1133,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1233,18 +1243,18 @@ reject(Requeue, Acked, Limiter) ->
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> incr_stats([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
rabbit_trace:tap_trace_out(Msg, TraceState),
@@ -1277,11 +1287,15 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
+ack(Acked, State = #ch{queue_names = QNames}) ->
Incs = fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count} | L];
+ error -> L
+ end
end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
incr_stats(Incs, ack, State).
@@ -1339,23 +1353,42 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
+ incr_stats([{exchange_stats, XName, 1}], return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1489,24 +1522,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 15aeea01..c8a361b1 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4,
+ purge/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
@@ -38,7 +38,6 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
ack_msg_id,
@@ -55,7 +54,6 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
ack_msg_id :: dict(),
@@ -114,7 +112,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
ack_msg_id = dict:new(),
@@ -136,8 +133,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -148,8 +145,7 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
stop_all_slaves(Reason, #state{gm = GM}) ->
Info = gm:info(GM),
@@ -175,17 +171,16 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
@@ -224,7 +219,6 @@ discard(MsgId, ChPid, State = #state { gm = GM,
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
- set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
@@ -234,9 +228,7 @@ dropwhile(Pred, AckRequired,
0 -> ok;
_ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -268,34 +260,25 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+fetch(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
{Result, State1};
- {Message, IsDelivered, AckTag, Remaining} ->
- ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- {{Message, IsDelivered1, AckTag, Remaining},
- drop(Message#basic_message.id, AckTag, State1)}
+ {#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
+ {Result, drop(MsgId, AckTag, State1)}
end.
-drop(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
+drop(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:drop(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {MsgId, AckTag, Remaining} ->
- ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
- {Result, drop(MsgId, AckTag, State1)}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -423,7 +406,6 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -458,10 +440,12 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
- State #state { set_delivered = lists:max([0, SetDelivered - 1]),
- ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+ State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 2b3bd027..58f20476 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -133,11 +133,11 @@ on_node_up() ->
end
end, [], rabbit_queue)
end),
- [{ok, _} = add_mirror(QName, node()) || QName <- QNames],
+ [add_mirror(QName, node()) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
- [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes],
+ [drop_mirror(QName, Node) || Node <- Nodes],
ok.
drop_mirror(QName, MirrorNode) ->
@@ -159,7 +159,7 @@ drop_mirror(QName, MirrorNode) ->
end).
add_mirrors(QName, Nodes) ->
- [{ok, _} = add_mirror(QName, Node) || Node <- Nodes],
+ [add_mirror(QName, Node) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode) ->
@@ -183,15 +183,7 @@ start_child(Name, MirrorNode, Q) ->
fun () ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
- {ok, undefined} ->
- %% this means the mirror process was
- %% already running on the given node.
- {ok, already_mirrored};
- {ok, down} ->
- %% Node went down between us deciding to start a mirror
- %% and actually starting it. Which is fine.
- {ok, node_down};
- {ok, SPid} ->
+ {ok, SPid} when is_pid(SPid) ->
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3ad8eb77..9354f485 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -318,7 +318,6 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -703,7 +702,7 @@ process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
@@ -727,8 +726,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{MsgId, AckTag, _Remaining}, BQSN1} =
- BQ:drop(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 2717cc92..2c997f16 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -166,8 +166,8 @@ update_policies(VHost) ->
[update_queue(Q, Policies) ||
Q <- rabbit_amqqueue:list(VHost)]}
end),
- [notify(X) || X <- Xs],
- [notify(Q) || Q <- Qs],
+ [catch notify(X) || X <- Xs],
+ [catch notify(Q) || Q <- Qs],
ok.
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d5c096a1..bb4ddceb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1286,8 +1286,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1311,9 +1310,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1338,31 +1337,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2231,15 +2230,16 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
false -> 1
end},
PayloadFun(N)),
- PropFun(N, #message_properties{}), self(), VQN)
+ PropFun(N, #message_properties{}), false, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2346,7 +2346,7 @@ test_variable_queue_requeue(VQ0) ->
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ {{#basic_message{}, true, AckTag}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
VQb
end, VQ5, lists:reverse(Acks)),
@@ -2386,14 +2386,16 @@ test_drop(VQ0) ->
%% start by sending a messages
VQ1 = variable_queue_publish(false, 1, VQ0),
%% drop message with AckRequired = true
- {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
true = AckTag =/= undefinded,
%% drop again -> empty
{empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
%% requeue
{[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
%% drop message with AckRequired = false
- {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
VQ5.
test_dropwhile(VQ0) ->
@@ -2412,7 +2414,7 @@ test_dropwhile(VQ0) ->
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {{#basic_message{}, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
VQM
end, VQ2, lists:seq(6, Count)),
@@ -2465,7 +2467,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2530,8 +2533,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2578,10 +2581,11 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b826413a..7fbac782 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
+ publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
@@ -348,7 +348,7 @@
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_ack_index :: gb_set(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -520,16 +520,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -556,8 +556,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = true },
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
@@ -591,8 +590,8 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
+ {{Msg, _IsDelivered, AckTag}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
{true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
@@ -619,9 +618,9 @@ drop(AckRequired, State) ->
{empty, State1} ->
{empty, a(State1)};
{{value, MsgStatus}, State1} ->
- {{_Msg, _IsDelivered, AckTag, Remaining}, State2} =
+ {{_Msg, _IsDelivered, AckTag}, State2} =
internal_fetch(AckRequired, MsgStatus, State1),
- {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)}
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
ack([], State) ->
@@ -749,7 +748,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_sets:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -828,7 +827,7 @@ status(#vqstate {
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_sets:size(RAI)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -891,11 +890,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties { delivered = Delivered }) ->
- %% TODO would it make sense to remove #msg_status.is_delivered?
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message { id = MsgId }, MsgProps) ->
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Delivered,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
@@ -1033,7 +1031,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_ack_index = gb_sets:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1145,14 +1143,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, IsDelivered, AckTag},
State1 #vqstate { ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
- len = Len1,
+ len = Len - 1,
persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
@@ -1250,18 +1247,15 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg_on_disk = MsgOnDisk } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
ack_in_counter = AckInCount}) ->
- {AckEntry, RAI1} =
- case MsgOnDisk of
- true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
- end,
- State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
+ RAI1 = case Msg of
+ undefined -> RAI;
+ _ -> gb_sets:insert(SeqId, RAI)
+ end,
+ State #vqstate { pending_ack = gb_trees:insert(SeqId, MsgStatus, PA),
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
@@ -1269,7 +1263,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
{gb_trees:get(SeqId, PA),
State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+ ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1280,7 +1274,7 @@ purge_pending_ack(KeepPersistent,
accumulate_ack(MsgStatus, Acc)
end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ ram_ack_index = gb_sets:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1501,7 +1495,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
@@ -1529,13 +1523,12 @@ limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+ case gb_sets:is_empty(RAI) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- gb_trees:get(SeqId, PA),
+ {SeqId, RAI1} = gb_sets:take_largest(RAI),
+ MsgStatus = gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),