summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-03 21:09:43 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-03 21:09:43 +0000
commit0ca5bfa7716f41578533cd84ac62b2e62ff0aedf (patch)
treedb241ad90a88f4a3e278594f9d2537f6635afc5f
parent123584aed5eb4fd71fe6090e525c339843081630 (diff)
parent706b00d72a2681b5dd138740dbc1b104304796ac (diff)
downloadrabbitmq-server-0ca5bfa7716f41578533cd84ac62b2e62ff0aedf.tar.gz
merge default into bug24407
-rw-r--r--include/rabbit.hrl3
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/credit_flow.erl75
-rw-r--r--src/delegate.erl12
-rw-r--r--src/gm.erl10
-rw-r--r--src/pmon.erl6
-rw-r--r--src/rabbit_amqqueue.erl92
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_auth_backend_internal.erl10
-rw-r--r--src/rabbit_backing_queue.erl42
-rw-r--r--src/rabbit_backing_queue_qc.erl8
-rw-r--r--src/rabbit_channel.erl251
-rw-r--r--src/rabbit_exchange.erl43
-rw-r--r--src/rabbit_guid.erl19
-rw-r--r--src/rabbit_mirror_queue_master.erl90
-rw-r--r--src/rabbit_misc.erl19
-rw-r--r--src/rabbit_mnesia.erl23
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_reader.erl203
-rw-r--r--src/rabbit_tests.erl81
-rw-r--r--src/rabbit_trace.erl37
-rw-r--r--src/rabbit_variable_queue.erl157
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_vm.erl21
25 files changed, 677 insertions, 615 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0ccb80bf..7385b4b3 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -27,9 +27,6 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties, capabilities}).
-
-record(content,
{class_id,
properties, %% either 'none', or a decoded record/tuple
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 5d9b9e2e..6e02c7a4 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -123,6 +123,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Dec 11 2012 simon@rabbitmq.com 3.0.1-1
+- New Upstream Release
+
* Fri Nov 16 2012 simon@rabbitmq.com 3.0.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 17327133..aed68b96 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.0.1-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Tue, 11 Dec 2012 11:29:55 +0000
+
rabbitmq-server (3.0.0-1) unstable; urgency=low
* New Upstream Release
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index c2bec7c7..dff339fc 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -52,6 +52,22 @@
%%----------------------------------------------------------------------------
+%% process dict update macro - eliminates the performance-hurting
+%% closure creation a HOF would introduce
+-define(UPDATE(Key, Default, Var, Expr),
+ begin
+ %% We deliberately allow Var to escape from the case here
+ %% to be used in Expr. Any temporary var we introduced
+ %% would also escape, and might conflict.
+ case get(Key) of
+ undefined -> Var = Default;
+ Var -> ok
+ end,
+ put(Key, Expr)
+ end).
+
+%%----------------------------------------------------------------------------
+
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
%% the flow of credit, but the function names refer to the flow of
@@ -66,29 +82,33 @@
send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
- update({credit_from, From}, InitialCredit,
- fun (1) -> block(From),
- 0;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_from, From}, InitialCredit, C,
+ if C == 1 -> block(From),
+ 0;
+ true -> C - 1
+ end).
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
- update({credit_to, To}, MoreCreditAfter,
- fun (1) -> grant(To, MoreCreditAfter),
- MoreCreditAfter;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_to, To}, MoreCreditAfter, C,
+ if C == 1 -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ true -> C - 1
+ end).
handle_bump_msg({From, MoreCredit}) ->
- update({credit_from, From}, 0,
- fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
- C + MoreCredit;
- (C) -> C + MoreCredit
- end).
-
-blocked() -> get(credit_blocked, []) =/= [].
+ ?UPDATE({credit_from, From}, 0, C,
+ if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ true -> C + MoreCredit
+ end).
+
+blocked() -> case get(credit_blocked) of
+ undefined -> false;
+ [] -> false;
+ _ -> true
+ end.
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
@@ -105,24 +125,17 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> update(credit_deferred, [],
- fun (Deferred) -> [{To, Msg} | Deferred] end)
+ true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
- update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
- false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
+ false -> case erase(credit_deferred) of
+ undefined -> ok;
+ Credits -> [To ! Msg || {To, Msg} <- Credits]
+ end;
true -> ok
end.
-
-get(Key, Default) ->
- case get(Key) of
- undefined -> Default;
- Value -> Value
- end.
-
-update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/delegate.erl b/src/delegate.erl
index d595e481..9222c34c 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -35,6 +35,10 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(call/2 ::
+ ( pid(), any()) -> any();
+ ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
+-spec(cast/2 :: (pid() | [pid()], any()) -> 'ok').
-endif.
@@ -96,6 +100,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
safe_invoke(LocalPids, Fun), %% must not die
ok.
+call(PidOrPids, Msg) ->
+ invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+
+cast(PidOrPids, Msg) ->
+ invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+
%%----------------------------------------------------------------------------
group_pids_by_node(Pids) ->
diff --git a/src/gm.erl b/src/gm.erl
index 4a95de0d..2057b1f5 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -667,6 +667,9 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
+handle_info(timeout, State) ->
+ noreply(flush_broadcast_buffer(State));
+
handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
@@ -834,10 +837,13 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), hibernate}.
+ {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), hibernate}.
+ {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+
+flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
+flush_timeout(_) -> 0.
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
diff --git a/src/pmon.erl b/src/pmon.erl
index 1aeebb72..37898119 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -19,6 +19,8 @@
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
monitored/1, is_empty/1]).
+-compile({no_auto_import, [monitor/2]}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
@@ -48,7 +50,9 @@ monitor(Item, M) ->
false -> dict:store(Item, erlang:monitor(process, Item), M)
end.
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], M) -> M; %% optimisation
+monitor_all([Item], M) -> monitor(Item, M); %% optimisation
+monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
demonitor(Item, M) ->
case dict:find(Item, M) of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 35b4fadf..71dc8257 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -286,7 +286,11 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
+policy_changed(Q1, Q2) ->
+ rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ %% Make sure we emit a stats event even if nothing
+ %% mirroring-related has changed - the policy may have changed anyway.
+ wake_up(Q1).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -300,6 +304,8 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup([]) -> []; %% optimisation
+lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive for reasons explained in rabbit_misc:dirty_read/1.
@@ -440,10 +446,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}) of
+ case delegate:call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -474,7 +480,7 @@ force_event_refresh(QNames) ->
wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
-consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -488,47 +494,51 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}).
+ delegate:call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+ delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
notify_down_all(QPids, ChPid) ->
- safe_delegate_call_ok(
- fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
- QPids).
+ {_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
+ case lists:filter(
+ fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) -> false
+ end, Bads) of
+ [] -> ok;
+ Bads1 -> {error, Bads1}
+ end.
limit_all(QPids, ChPid, Limiter) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end).
+ delegate:cast(QPids, {limit, ChPid, Limiter}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}).
+ delegate:call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate_call(QPid, {basic_consume, NoAck, ChPid,
+ delegate:call(QPid, {basic_consume, NoAck, ChPid,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -547,11 +557,9 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}).
+unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
-flush_all(QPids, ChPid) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -589,8 +597,8 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
+start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate_call(QPid, sync_mirrors).
@@ -654,10 +662,8 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
%% done with it.
MMsg = {deliver, Delivery, false, Flow},
SMsg = {deliver, Delivery, true, Flow},
- delegate:invoke_no_result(MPids,
- fun (QPid) -> gen_server2:cast(QPid, MMsg) end),
- delegate:invoke_no_result(SPids,
- fun (QPid) -> gen_server2:cast(QPid, SMsg) end),
+ delegate:cast(MPids, MMsg),
+ delegate:cast(SPids, SMsg),
{routed, QPids};
deliver(Qs, Delivery, _Flow) ->
@@ -665,14 +671,8 @@ deliver(Qs, Delivery, _Flow) ->
%% see comment above
MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
- {MRouted, _} = delegate:invoke(
- MPids, fun (QPid) ->
- ok = gen_server2:call(QPid, MMsg, infinity)
- end),
- {SRouted, _} = delegate:invoke(
- SPids, fun (QPid) ->
- ok = gen_server2:call(QPid, SMsg, infinity)
- end),
+ {MRouted, _} = delegate:call(MPids, MMsg),
+ {SRouted, _} = delegate:call(SPids, SMsg),
case MRouted ++ SRouted of
[] -> {unroutable, []};
R -> {routed, [QPid || {QPid, ok} <- R]}
@@ -684,23 +684,3 @@ qpids(Qs) ->
{[QPid | MPidAcc], [SPids | SPidAcc]}
end, {[], []}, Qs),
{MPids, lists:append(SPids)}.
-
-safe_delegate_call_ok(F, Pids) ->
- {_, Bads} = delegate:invoke(Pids, fun (Pid) ->
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> F(Pid) end)
- end),
- case lists:filter(
- fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R);
- ({_Pid, _}) -> false
- end, Bads) of
- [] -> ok;
- Bads1 -> {error, Bads1}
- end.
-
-delegate_call(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
-
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 730c235e..326065d1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -262,7 +262,7 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
@@ -283,21 +283,17 @@ terminate_shutdown(Fun, State) ->
end.
reply(Reply, NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {reply, Reply, NewState1, Timeout}.
+ {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
noreply(NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {noreply, NewState1, Timeout}.
+ {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_stats_timer(
- ensure_rate_timer(
- confirm_messages(MsgIds, State#q{
- backing_queue_state = BQS1}))),
+ State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -327,15 +323,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
TRef = erlang:send_after(
?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
State.
stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
@@ -487,7 +479,7 @@ deliver_msg_to_consumer(DeliverFun,
deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State1),
+ drop_expired_msgs(State1),
{Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
@@ -534,7 +526,7 @@ discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State),
+ drop_expired_msgs(State),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
BQ:is_empty(BQS), State1),
@@ -719,20 +711,21 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{dlx = DLX,
- backing_queue_state = BQS,
- backing_queue = BQ }) ->
+drop_expired_msgs(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
+ backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
+ undefined -> BQ:dropwhile(ExpirePred, BQS);
+ _ -> {Next, Msgs, BQS2} =
+ BQ:fetchwhile(ExpirePred,
+ fun accumulate_msgs/3,
+ [], BQS),
case Msgs of
[] -> ok;
- _ -> (dead_letter_fun(expired))(Msgs)
+ _ -> (dead_letter_fun(expired))(
+ lists:reverse(Msgs))
end,
{Next, BQS2}
end,
@@ -741,6 +734,8 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
+accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
@@ -796,12 +791,9 @@ stop(State) -> stop(undefined, noreply, State).
stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
+ {true, noreply} -> {stop, normal, State};
+ {true, _} -> {stop, normal, Reply, State};
+ {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -1058,7 +1050,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_messages(State1)) of
+ case fetch(AckRequired, drop_expired_msgs(State1)) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1131,7 +1123,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(ensure_expiry_timer(State)),
+ drop_expired_msgs(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
@@ -1225,8 +1217,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ {ok, BQS1} = BQ:ackfold(
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1335,14 +1328,14 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+ noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
handle_info(emit_stats, State) ->
emit_stats(State),
- {noreply, State1, Timeout} = noreply(State),
- %% Need to reset *after* we've been through noreply/1 so we do not
- %% just create another timer always and therefore never hibernate
- {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(rabbit_event:reset_stats_timer(
+ State, #q.stats_timer)),
+ {noreply, State1, Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -1366,8 +1359,10 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined,
+ backing_queue_state = BQS2}),
+ {noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 7b9df81e..919be3f3 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -49,7 +49,7 @@
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> rabbit_types:infos()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(lookup_user/1 :: (rabbit_types:username())
-> rabbit_types:ok(rabbit_types:internal_user())
@@ -58,14 +58,14 @@
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
--spec(list_permissions/0 :: () -> rabbit_types:infos()).
+-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> rabbit_types:infos()).
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
- (rabbit_types:username()) -> rabbit_types:infos()).
+ (rabbit_types:username()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
- -> rabbit_types:infos()).
+ -> [rabbit_types:infos()]).
-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 96c58cb9..99b5946e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,8 +35,7 @@
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -124,16 +123,22 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
@@ -147,14 +152,14 @@
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
@@ -222,8 +227,9 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 5},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a5d0a008..5b3b8aa8 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -115,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) ->
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
expiry = oneof([undefined | lists:seq(1, 10)])},
- self(), BQ]}.
+ false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
{call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
@@ -147,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -182,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
-next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
@@ -262,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [3, Res]},
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b1ef3b6b..1af60de8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,14 +33,15 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx_status, next_tag, unacked_message_q,
- uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ conn_name, limiter, tx, next_tag, unacked_message_q, user,
virtual_host, most_recently_declared_queue,
queue_names, queue_monitors, consumer_mapping,
blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
+-record(tx, {msgs, acks, nacks}).
+
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
@@ -65,6 +66,12 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INCR_STATS(Incs, Measure, State),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine -> incr_stats(Incs, Measure);
+ _ -> ok
+ end).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -186,12 +193,9 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
conn_pid = ConnPid,
conn_name = ConnName,
limiter = Limiter,
- tx_status = none,
+ tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
- uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -314,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout}.
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -327,8 +334,10 @@ handle_info(timeout, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- noreply([ensure_stats_timer],
- rabbit_event:reset_stats_timer(State, #ch.stats_timer));
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -372,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> reply(Reply, [], NewState).
-
-reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
-
-reply(Reply, Mask, NewState, Timeout) ->
- {reply, Reply, next_state(Mask, NewState), Timeout}.
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-noreply(NewState) -> noreply([], NewState).
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
-
-noreply(Mask, NewState, Timeout) ->
- {noreply, next_state(Mask, NewState), Timeout}.
-
--define(MASKED_CALL(Fun, Mask, State),
- case lists:member(Fun, Mask) of
- true -> State;
- false -> Fun(State)
- end).
-
-next_state(Mask, State) ->
- State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
- State2 = ?MASKED_CALL(send_confirms, Mask, State1),
- State2.
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -529,16 +519,12 @@ check_not_default_exchange(_) ->
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
-%% One, quite reasonable, interpretation of the spec, taken by the
-%% QPid M1 Java client, is that the exclusion of "amq." prefixed names
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
%% only applies on actual creation, and not in the cases where the
-%% entity already exists. This is how we use this function in the code
-%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly
-%% 0-9SP1, making it illegal to attempt to declare an exchange/queue
-%% with an amq.* name when passive=false. So this will need
-%% revisiting.
+%% entity already exists or passive=true.
%%
-%% TODO: enforce other constraints on name. See AMQP JIRA 69.
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
rabbit_misc:protocol_error(
access_refused,
@@ -559,11 +545,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-record_confirm(undefined, _, State) ->
- State;
-record_confirm(MsgSeqNo, XName, State) ->
- record_confirms([{MsgSeqNo, XName}], State).
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -603,8 +584,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
%% while waiting for the reply to a synchronous command, we generally
%% do allow this...except in the case of a pending tx.commit, where
%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx_status = TxStatus})
- when TxStatus =/= none andalso TxStatus =/= in_progress ->
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
rabbit_misc:protocol_error(
channel_error, "unexpected command while processing 'tx.commit'", []);
@@ -618,7 +599,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
- tx_status = TxStatus,
+ tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -632,22 +613,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {TxStatus, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
+ DQ = {Delivery, QNames},
{noreply,
- case TxStatus of
- none -> deliver_to_queues({Delivery, QNames}, State1);
- in_progress -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ}
+ case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = Tx#tx{msgs = Msgs1}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
@@ -661,15 +642,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
- case TxStatus of
- none -> ack(Acked, State1),
- State1;
- in_progress -> State1#ch{uncommitted_acks =
- Acked ++ State1#ch.uncommitted_acks}
+ case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -1047,34 +1027,37 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
+ {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _,
- State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- ack(TAL, State1),
+handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
+ acks = Acks,
+ nacks = Nacks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ ack(Acks, State1),
lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
- {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL}) ->
- TNL1 = lists:append([L || {_, L} <- TNL]),
- UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
-
-handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+ tx = #tx{acks = Acks,
+ nacks = Nacks}}) ->
+ NacksL = lists:append([L || {_, L} <- Nacks]),
+ UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
+
+handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1222,17 +1205,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
Content).
reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
- case TxStatus of
- none ->
- reject(Requeue, Acked, State1#ch.limiter),
- State1;
- in_progress ->
- State1#ch{uncommitted_nacks =
- [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks],
+ State1#ch{tx = Tx#tx{nacks = Nacks1}}
end}.
reject(Requeue, Acked, Limiter) ->
@@ -1247,17 +1228,17 @@ record_sent(ConsumerTag, AckRequired,
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QName, 1}], redeliver, State);
+ true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
@@ -1298,11 +1279,9 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
end
end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
- incr_stats(Incs, ack, State).
+ ?INCR_STATS(Incs, ack, State).
-new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = []}.
+new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1379,43 +1358,46 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
XName, MsgSeqNo, Message,
State#ch{queue_names = QNames1,
queue_monitors = QMons1}),
- incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QName, XName}, 1} ||
- QPid <- DeliveredQPids,
- {ok, QName} <- [dict:find(QPid, QNames1)]]],
- publish, State1),
+ ?INCR_STATS([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
State1.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, XName, 1}], return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirms([{MsgSeqNo, XName}], State);
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)}.
+ State#ch.unconfirmed)};
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> record_confirms([{MsgSeqNo, XName}], State)
+ end.
send_nacks([], State) ->
State;
-send_nacks(MXs, State = #ch{tx_status = none}) ->
+send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx_status = failed}).
+ maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
-send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
@@ -1451,7 +1433,7 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+maybe_complete_tx(State = #ch{tx = #tx{}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1459,16 +1441,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
true -> complete_tx(State#ch{confirmed = []})
end.
-complete_tx(State = #ch{tx_status = committing}) ->
+complete_tx(State = #ch{tx = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
- State#ch{tx_status = in_progress};
-complete_tx(State = #ch{tx_status = failed}) ->
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
State),
- State1#ch{tx_status = in_progress}.
+ State1#ch{tx = new_tx()}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1477,19 +1459,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
- dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
- queue:len(UAMQ);
-i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
- queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
- length(TAL);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1500,12 +1479,8 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure, State) ->
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [update_measures(Type, Key, Inc, Measure) ||
- {Type, Key, Inc} <- Incs];
- _ -> ok
- end.
+incr_stats(Incs, Measure) ->
+ [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs].
update_measures(Type, Key, Inc, Measure) ->
Measures = case get({Type, Key}) of
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e72cbafe..9339161f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -310,22 +310,19 @@ route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {queue:from_list([X]), XName, []}).
-
-route1(Delivery, {WorkList, SeenXs, QNames}) ->
- case queue:out(WorkList) of
- {empty, _WorkList} ->
- lists:usort(QNames);
- {{value, X = #exchange{type = Type}}, WorkList1} ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
- end.
+ route1(Delivery, {[X], XName, []}).
+
+route1(_, {[], _, QNames}) ->
+ lists:usort(QNames);
+route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ DstNames = process_alternate(
+ X, ((type_to_module(Type)):route(X, Delivery))),
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
+ DstNames)).
process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+ Results;
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
@@ -339,23 +336,25 @@ process_route(#resource{kind = exchange} = XName,
Acc;
process_route(#resource{kind = exchange} = XName,
{WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
- {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:from_list([SeenX, XName]), QNames};
+ {cons_if_present(XName, WorkList),
+ gb_sets:from_list([SeenX, XName]), QNames};
process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
- false -> {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:add_element(XName, SeenXs), QNames}
+ false -> {cons_if_present(XName, WorkList),
+ gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
+cons_if_present(XName, L) ->
+ case lookup(XName) of
+ {ok, X} -> [X | L];
+ {error, not_found} -> L
+ end.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index cedbbdb3..8ee9ad5b 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -104,8 +104,6 @@ advance_blocks({B1, B2, B3, B4}, I) ->
B5 = erlang:phash2({B1, I}, 4294967296),
{{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
-blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
-
%% generate a GUID. This function should be used when performance is a
%% priority and predictability is not an issue. Otherwise use
%% gen_secure/0.
@@ -114,14 +112,15 @@ gen() ->
%% time we need a new guid we rotate them producing a new hash
%% with the aid of the counter. Look at the comments in
%% advance_blocks/2 for details.
- {BS, I} = case get(guid) of
- undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
- erlang:md5(term_to_binary(fresh())),
- {{B1,B2,B3,B4}, 0};
- {BS0, I0} -> advance_blocks(BS0, I0)
- end,
- put(guid, {BS, I}),
- blocks_to_binary(BS).
+ case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> = Res =
+ erlang:md5(term_to_binary(fresh())),
+ put(guid, {{B1, B2, B3, B4}, 0}),
+ Res;
+ {BS, I} -> {{B1, B2, B3, B4}, _} = S = advance_blocks(BS, I),
+ put(guid, S),
+ <<B1:32, B2:32, B3:32, B4:32>>
+ end.
%% generate a non-predictable GUID.
%%
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 70df62e2..0df7ea1c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,11 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2,
- requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -41,7 +41,6 @@
backing_queue_state,
seen_status,
confirmed,
- ack_msg_id,
known_senders
}).
@@ -58,7 +57,6 @@
backing_queue_state :: any(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
- ack_msg_id :: dict(),
known_senders :: set()
}).
@@ -119,7 +117,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
backing_queue_state = BQS,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:new() }.
stop_mirroring(State = #state { coordinator = CPid,
@@ -212,13 +209,11 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, State = #state { gm = GM,
@@ -241,19 +236,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -289,38 +282,29 @@ fetch(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
- {Result, drop(MsgId, AckTag, State1)}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ end}.
drop(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:drop(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
- empty -> State1;
- {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ empty -> State1;
+ {_MsgId, AckTag} -> drop_one(AckTag, State1)
end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
-
-foreach_ack(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -329,6 +313,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
fold(Fun, Acc, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fold(Fun, Acc, BQS),
@@ -434,7 +423,6 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
backing_queue_state = BQS1,
seen_status = Seen,
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
sender_death_fun() ->
@@ -466,15 +454,21 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+drop_one(AckTag, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
- State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+ State.
-maybe_store_acktag(undefined, _MsgId, AM) -> AM;
-maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ccf3d8be..ce3e3802 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([parse_arguments/3]).
@@ -192,6 +193,7 @@
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
-> boolean()).
+-spec(version_minor_equivalent/2 :: (string(), string()) -> boolean()).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
@@ -351,13 +353,12 @@ set_table_value(Table, Key, Type, Value) ->
sort_field_table(
lists:keystore(Key, 1, Table, {Key, Type, Value})).
-r(#resource{virtual_host = VHostPath}, Kind, Name)
- when is_binary(Name) ->
+r(#resource{virtual_host = VHostPath}, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
-r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
+r(VHostPath, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name}.
-r(VHostPath, Kind) when is_binary(VHostPath) ->
+r(VHostPath, Kind) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
@@ -735,6 +736,16 @@ version_compare(A, B) ->
ANum > BNum -> gt
end.
+%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If
+%% versions do not match that pattern, just compare them.
+version_minor_equivalent(A, B) ->
+ {ok, RE} = re:compile("^(\\d+\\.\\d+)(\\.\\d+)\$"),
+ Opts = [{capture, all_but_first, list}],
+ case {re:run(A, RE, Opts), re:run(B, RE, Opts)} of
+ {{match, [A1|_]}, {match, [B1|_]}} -> A1 =:= B1;
+ _ -> A =:= B
+ end.
+
dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
dict_cons(Key, Value, Dict) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 942048f9..6a442fec 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -68,7 +68,8 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
@@ -757,9 +758,16 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
[node(), Node, Node])}}
end.
-check_version_consistency(This, Remote, _) when This =:= Remote ->
- ok;
check_version_consistency(This, Remote, Name) ->
+ check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end).
+
+check_version_consistency(This, Remote, Name, Comp) ->
+ case Comp(This, Remote) of
+ true -> ok;
+ false -> version_error(Name, This, Remote)
+ end.
+
+version_error(Name, This, Remote) ->
{error, {inconsistent_cluster,
rabbit_misc:format("~s version mismatch: local node is ~s, "
"remote node ~s", [Name, This, Remote])}}.
@@ -767,8 +775,15 @@ check_version_consistency(This, Remote, Name) ->
check_otp_consistency(Remote) ->
check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be
+%% removed after 3.1.0 is released.
+check_rabbit_consistency("3.0.0") ->
+ version_error("Rabbit", rabbit_misc:version(), "3.0.0");
+
check_rabbit_consistency(Remote) ->
- check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+ check_version_consistency(
+ rabbit_misc:version(), Remote, "Rabbit",
+ fun rabbit_misc:version_minor_equivalent/2).
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8d0e4456..258ac0ce 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -53,7 +53,7 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+-spec(partitions/0 :: () -> {node(), [node()]}).
-endif.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 928786e9..13e8feff 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,12 +35,16 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at, host, peer_host,
- port, peer_port}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+
+-record(connection, {name, host, peer_host, port, peer_port,
+ protocol, user, timeout_sec, frame_max, vhost,
+ client_properties, capabilities,
+ auth_mechanism, auth_state}).
+
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -205,15 +209,21 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
- name = list_to_binary(Name),
connection = #connection{
+ name = list_to_binary(Name),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
- capabilities = []},
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -224,15 +234,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- auth_mechanism = none,
- auth_state = none,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never,
- host = Host,
- peer_host = PeerHost,
- port = Port,
- peer_port = PeerPort},
+ throttle = #throttle{
+ conserve_resources = false,
+ last_blocked_by = none,
+ last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -288,8 +293,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb,
+ State = #v1{throttle = Throttle}) ->
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -372,29 +379,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case {CS, (Throttle#throttle.conserve_resources orelse
+ credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
+ Throttle#throttle{last_blocked_by = resource};
+update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
+ Throttle#throttle{last_blocked_by = flow}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -531,9 +540,10 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, name = Name, queue_collector = Collector,
+ #v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
+ connection = #connection{name = Name,
+ protocol = Protocol,
frame_max = FrameMax,
user = User,
vhost = VHost,
@@ -594,40 +604,36 @@ handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- {ChPid, AState} = case get({channel, Channel}) of
+ ChKey = {channel, Channel},
+ {ChPid, AState} = case get(ChKey) of
undefined -> create_channel(Channel, State);
Other -> Other
end,
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end.
-
-process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- control_throttle(State);
-post_process_frame({method, MethodName, _}, _ChPid,
- State = #v1{connection = #connection{
- protocol = Protocol}}) ->
- case Protocol:method_has_content(MethodName) of
- true -> erlang:bump_reductions(2000),
- maybe_block(control_throttle(State));
- false -> control_throttle(State)
- end;
+ State;
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
post_process_frame(_Frame, _ChPid, State) ->
- control_throttle(State).
+ State.
%%--------------------------------------------------------------------------
@@ -746,13 +752,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
- State = State0#v1{auth_mechanism = AuthMechanism,
- auth_state = AuthMechanism:init(Sock),
- connection_state = securing,
+ State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
- capabilities = Capabilities}},
+ capabilities = Capabilities,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -790,10 +796,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -801,7 +808,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
- conserve_resources = Conserve}),
+ throttle = Throttle#throttle{
+ conserve_resources = Conserve}}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -870,10 +878,10 @@ auth_mechanisms_binary(Sock) ->
string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
- State = #v1{auth_mechanism = AuthMechanism,
- auth_state = AuthState,
- connection = Connection =
- #connection{protocol = Protocol},
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
@@ -886,14 +894,16 @@ auth_phase(Response,
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{auth_state = AuthState1};
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}}
+ connection = Connection#connection{user = User,
+ auth_state = none}}
end.
%%--------------------------------------------------------------------------
@@ -901,11 +911,6 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) -> self();
-i(name, #v1{name = Name}) -> Name;
-i(host, #v1{host = Host}) -> Host;
-i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
-i(port, #v1{port = Port}) -> Port;
-i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -922,36 +927,32 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(user, #v1{connection = #connection{user = none}}) ->
- '';
-i(user, #v1{connection = #connection{user = #user{
- username = Username}}}) ->
- Username;
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
- VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
- Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
- FrameMax;
-i(client_properties, #v1{connection = #connection{client_properties =
- ClientProperties}}) ->
- ClientProperties;
-i(Item, #v1{}) ->
- throw({bad_argument, Item}).
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index df8544a4..09ed3d08 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -38,6 +38,7 @@ all_tests() ->
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
+ passed = test_version_equivalance(),
passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
@@ -141,6 +142,16 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed.
+test_version_equivalance() ->
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"),
+ true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"),
+ passed.
+
test_multi_call() ->
Fun = fun() ->
receive
@@ -2307,8 +2318,9 @@ test_variable_queue() ->
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_drop/1,
fun test_variable_queue_fold_msg_on_disk/1,
- fun test_dropwhile/1,
+ fun test_dropfetchwhile/1,
fun test_dropwhile_varying_ram_duration/1,
+ fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
@@ -2409,41 +2421,70 @@ test_drop(VQ0) ->
true = rabbit_variable_queue:is_empty(VQ5),
VQ5.
-test_dropwhile(VQ0) ->
+test_dropfetchwhile(VQ0) ->
Count = 10,
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
false, Count,
- fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% fetch the first 5 messages
+ {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
+ rabbit_variable_queue:fetchwhile(
+ fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
+ {[Msg | MsgAcc], [AckTag | AckAcc]}
+ end, {[], []}, VQ1),
+ true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
+
+ %% requeue them
+ {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2),
%% drop the first 5 messages
- {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
-
- %% fetch five now
- VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _}, VQM} =
+ {#message_properties{expiry = 6}, VQ4} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (N, VQN) ->
+ {{Msg, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
+ true = msg2int(Msg) == N,
VQM
- end, VQ2, lists:seq(6, Count)),
+ end, VQ4, lists:seq(6, Count)),
%% should be empty now
- {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+ true = rabbit_variable_queue:is_empty(VQ5),
- VQ4.
+ VQ5.
test_dropwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun (_) -> false end, VQ1),
+ VQ2
+ end, VQ0).
+
+test_fetchwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
+ fun (_) -> false end,
+ fun (_, _, A) -> A end,
+ ok, VQ1),
+ VQ2
+ end, VQ0).
+
+test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ VQ3 = Fun(VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, undefined, VQ6} =
- rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6 = Fun(VQ5),
VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
@@ -2567,8 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
- VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 3a5b96de..601656da 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -31,9 +31,9 @@
-type(state() :: rabbit_types:exchange() | 'none').
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
--spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -43,26 +43,26 @@
%%----------------------------------------------------------------------------
init(VHost) ->
- case tracing(VHost) of
+ case enabled(VHost) of
false -> none;
true -> {ok, X} = rabbit_exchange:lookup(
rabbit_misc:r(VHost, exchange, ?XNAME)),
X
end.
-tracing(VHost) ->
+enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
- TraceX) ->
- maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
+tap_in(_Msg, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName, []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
- TraceX) ->
+tap_out(_Msg, none) -> ok;
+tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ trace(TraceX, Msg, <<"deliver">>, QName,
+ [{<<"redelivered">>, signedint, RedeliveredNum}]).
%%----------------------------------------------------------------------------
@@ -83,14 +83,11 @@ update_config(Fun) ->
%%----------------------------------------------------------------------------
-maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) ->
+trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
+ _RKPrefix, _RKSuffix, _Extra) ->
ok;
-maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
- _RKPrefix, _RKSuffix, _Extra) ->
- ok;
-maybe_trace(X, Msg = #basic_message{content = #content{
- payload_fragments_rev = PFR}},
- RKPrefix, RKSuffix, Extra) ->
+trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
+ RKPrefix, RKSuffix, Extra) ->
{ok, _, _} = rabbit_basic:publish(
X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
#'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30ab96f5..37ca6de0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,10 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -577,27 +578,28 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = remove(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _IsDelivered, AckTag}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -608,9 +610,9 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
- {Res, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
drop(AckRequired, State) ->
@@ -618,8 +620,7 @@ drop(AckRequired, State) ->
{empty, State1} ->
{empty, a(State1)};
{{value, MsgStatus}, State1} ->
- {{_Msg, _IsDelivered, AckTag}, State2} =
- internal_fetch(AckRequired, MsgStatus, State1),
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
{{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
@@ -646,16 +647,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-foreach_ack(undefined, State, _AckTags) ->
- State;
-foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -677,6 +668,16 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(
+ fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
+ MsgStatus = gb_trees:get(SeqId, PA),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
fold(Fun, Acc, #vqstate { q1 = Q1,
q2 = Q2,
delta = #delta { start_seq_id = DeltaSeqId,
@@ -684,9 +685,9 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
- read_msg(MsgStatus, false, State0),
- {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {StopGo, AccNext} =
+ Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
{StopGo, {AccNext, State1}}
end,
{Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
@@ -1071,9 +1072,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, true, State),
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1089,35 +1091,33 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
-
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
+read_msg(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
+ RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
+ {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
+ msg_store_clients = MSCState1 }};
+read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State}.
+
+remove(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1128,12 +1128,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
- end,
+ IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
+ end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
@@ -1144,15 +1143,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1373,7 +1371,8 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
+ {Msg, State1} = read_msg(MsgStatus, true, State),
+ {MsgStatus#msg_status { msg = Msg }, State1};
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 297fa56f..0bb18f4c 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -123,7 +123,7 @@ with(VHostPath, Thunk) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
-i(tracing, VHost) -> rabbit_trace:tracing(VHost);
+i(tracing, VHost) -> rabbit_trace:enabled(VHost);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 53f3df18..db674f91 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -84,7 +84,15 @@ sup_memory(Sup) ->
sup_children(Sup) ->
rabbit_misc:with_exit_handler(
- rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+ rabbit_misc:const([]),
+ fun () ->
+ %% Just in case we end up talking to something that is
+ %% not a supervisor by mistake.
+ case supervisor:which_children(Sup) of
+ L when is_list(L) -> L;
+ _ -> []
+ end
+ end).
pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
{memory, M} -> M;
@@ -119,10 +127,13 @@ plugin_memory() ->
is_plugin(atom_to_list(App))]).
plugin_memory(App) ->
- case catch application_master:get_child(
- application_controller:get_master(App)) of
- {Pid, _} -> sup_memory(Pid);
- _ -> 0
+ case application_controller:get_master(App) of
+ undefined -> 0;
+ Master -> case application_master:get_child(Master) of
+ {Pid, _} when is_pid(Pid) -> sup_memory(Pid);
+ Pid when is_pid(Pid) -> sup_memory(Pid);
+ _ -> 0
+ end
end.
is_plugin("rabbitmq_" ++ _) -> true;