summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-31 12:24:54 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-31 12:24:54 +0000
commit00881c0e78aa14be4e3c353dd08f82f7ae3812a1 (patch)
tree0be8205c9bde86f976ef6937941501cfc86e7caf
parenta39bc21538e580476fca3b5a9252dd360ea82147 (diff)
parent1c50d72ccdd59c94b1973093749528c899a26bf1 (diff)
downloadrabbitmq-server-00881c0e78aa14be4e3c353dd08f82f7ae3812a1.tar.gz
merge default into bug25428bug25428
-rw-r--r--Makefile2
-rwxr-xr-xcheck_xref14
-rw-r--r--docs/rabbitmqctl.1.xml55
-rw-r--r--include/rabbit.hrl7
-rw-r--r--src/credit_flow.erl75
-rw-r--r--src/delegate.erl12
-rw-r--r--src/gen_server2.erl92
-rw-r--r--src/pmon.erl6
-rw-r--r--src/rabbit.erl144
-rw-r--r--src/rabbit_alarm.erl17
-rw-r--r--src/rabbit_amqqueue.erl54
-rw-r--r--src/rabbit_amqqueue_process.erl463
-rw-r--r--src/rabbit_auth_backend_internal.erl10
-rw-r--r--src/rabbit_backing_queue.erl73
-rw-r--r--src/rabbit_backing_queue_qc.erl113
-rw-r--r--src/rabbit_channel.erl489
-rw-r--r--src/rabbit_connection_sup.erl7
-rw-r--r--src/rabbit_control_main.erl32
-rw-r--r--src/rabbit_event.erl12
-rw-r--r--src/rabbit_exchange.erl65
-rw-r--r--src/rabbit_exchange_type_invalid.erl4
-rw-r--r--src/rabbit_guid.erl19
-rw-r--r--src/rabbit_mirror_queue_master.erl206
-rw-r--r--src/rabbit_mirror_queue_slave.erl116
-rw-r--r--src/rabbit_mirror_queue_sync.erl260
-rw-r--r--src/rabbit_misc.erl63
-rw-r--r--src/rabbit_mnesia.erl18
-rw-r--r--src/rabbit_msg_store.erl13
-rw-r--r--src/rabbit_networking.erl20
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_plugins.erl9
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_reader.erl309
-rw-r--r--src/rabbit_tests.erl301
-rw-r--r--src/rabbit_trace.erl37
-rw-r--r--src/rabbit_variable_queue.erl491
-rw-r--r--src/rabbit_vhost.erl2
37 files changed, 2240 insertions, 1384 deletions
diff --git a/Makefile b/Makefile
index c63e3dfd..bf33b931 100644
--- a/Makefile
+++ b/Makefile
@@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
else \
dialyzer --output_plt $@ --build_plt \
--apps erts kernel stdlib compiler sasl os_mon mnesia tools \
- public_key crypto ssl; \
+ public_key crypto ssl xmerl; \
fi
clean:
diff --git a/check_xref b/check_xref
index e0c049f8..df019311 100755
--- a/check_xref
+++ b/check_xref
@@ -50,6 +50,7 @@ shutdown(Rc, LibDir) ->
check(Cwd, PluginsDir, LibDir, Checks) ->
{ok, Plugins} = file:list_dir(PluginsDir),
ok = file:make_dir(LibDir),
+ put({?MODULE, third_party}, []),
[begin
Source = filename:join(PluginsDir, Plugin),
Target = filename:join(LibDir, Plugin),
@@ -162,7 +163,8 @@ filters() ->
filter_chain(FnChain) ->
fun(AnalysisResult) ->
- lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult));
+ Result = cleanup(AnalysisResult),
+ lists:foldl(fun(F, false) -> F(Result);
(_F, true) -> true
end, false, FnChain)
end.
@@ -267,14 +269,8 @@ source_file(M) ->
store_third_party(App) ->
{ok, AppConfig} = application:get_all_key(App),
- case get({?MODULE, third_party}) of
- undefined ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig));
- Modules ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig) ++ Modules)
- end.
+ AppModules = proplists:get_value(modules, AppConfig),
+ put({?MODULE, third_party}, AppModules ++ get({?MODULE, third_party})).
%% TODO: this ought not to be maintained in such a fashion
external_dependency(Path) ->
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 34947b66..bbd2fe5b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -446,6 +446,55 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to synchronise.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a mirrored queue with unsynchronised slaves to
+ synchronise itself. The queue will block while
+ synchronisation takes place (all publishers to and
+ consumers from the queue will block). The queue must be
+ mirrored for this command to succeed.
+ </para>
+ <para>
+ Note that unsynchronised queues from which messages are
+ being drained will become synchronised eventually. This
+ command is primarily useful for queues which are not
+ being drained.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to cancel synchronisation for.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a synchronising mirrored queue to stop
+ synchronising itself.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
@@ -1109,6 +1158,12 @@
i.e. those which could take over from the master without
message loss.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>status</term>
+ <listitem><para>The status of the queue. Normally
+ 'running', but may be "{syncing, MsgCount}" if the queue is
+ synchronising.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 19766a00..eeee799e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -27,9 +27,6 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties, capabilities}).
-
-record(content,
{class_id,
properties, %% either 'none', or a decoded record/tuple
@@ -78,8 +75,7 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false,
- delivered = false}).
+-record(message_properties, {expiry, needs_confirming = false}).
-record(plugin, {name, %% atom()
version, %% string()
@@ -92,7 +88,6 @@
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2013 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
--define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
%% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 9ffaf247..106179fd 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -52,6 +52,22 @@
%%----------------------------------------------------------------------------
+%% process dict update macro - eliminates the performance-hurting
+%% closure creation a HOF would introduce
+-define(UPDATE(Key, Default, Var, Expr),
+ begin
+ %% We deliberately allow Var to escape from the case here
+ %% to be used in Expr. Any temporary var we introduced
+ %% would also escape, and might conflict.
+ case get(Key) of
+ undefined -> Var = Default;
+ Var -> ok
+ end,
+ put(Key, Expr)
+ end).
+
+%%----------------------------------------------------------------------------
+
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
%% the flow of credit, but the function names refer to the flow of
@@ -66,29 +82,33 @@
send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
- update({credit_from, From}, InitialCredit,
- fun (1) -> block(From),
- 0;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_from, From}, InitialCredit, C,
+ if C == 1 -> block(From),
+ 0;
+ true -> C - 1
+ end).
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
- update({credit_to, To}, MoreCreditAfter,
- fun (1) -> grant(To, MoreCreditAfter),
- MoreCreditAfter;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_to, To}, MoreCreditAfter, C,
+ if C == 1 -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ true -> C - 1
+ end).
handle_bump_msg({From, MoreCredit}) ->
- update({credit_from, From}, 0,
- fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
- C + MoreCredit;
- (C) -> C + MoreCredit
- end).
-
-blocked() -> get(credit_blocked, []) =/= [].
+ ?UPDATE({credit_from, From}, 0, C,
+ if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ true -> C + MoreCredit
+ end).
+
+blocked() -> case get(credit_blocked) of
+ undefined -> false;
+ [] -> false;
+ _ -> true
+ end.
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
@@ -105,24 +125,17 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> update(credit_deferred, [],
- fun (Deferred) -> [{To, Msg} | Deferred] end)
+ true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
- update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
- false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
+ false -> case erase(credit_deferred) of
+ undefined -> ok;
+ Credits -> [To ! Msg || {To, Msg} <- Credits]
+ end;
true -> ok
end.
-
-get(Key, Default) ->
- case get(Key) of
- undefined -> Default;
- Value -> Value
- end.
-
-update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/delegate.erl b/src/delegate.erl
index b622dc6b..e833b819 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -62,6 +62,13 @@ invoke(Pid, Fun) when is_pid(Pid) ->
erlang:raise(Class, Reason, StackTrace)
end;
+invoke([], _Fun) -> %% optimisation
+ {[], []};
+invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, Fun) of
+ {ok, _, Result} -> {[{Pid, Result}], []};
+ {error, _, Error} -> {[], [{Pid, Error}]}
+ end;
invoke(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
@@ -90,6 +97,11 @@ invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
invoke_no_result([Pid], Fun);
+invoke_no_result([], _Fun) -> %% optimisation
+ ok;
+invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, Fun), %% must not die
+ ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 4056e3d9..c82327a2 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -196,8 +196,7 @@
%% State record
-record(gs2_state, {parent, name, state, mod, time,
- timeout_state, queue, debug, prioritise_call,
- prioritise_cast, prioritise_info}).
+ timeout_state, queue, debug, prioritisers}).
-ifdef(use_specs).
@@ -638,17 +637,17 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
in({'$gen_cast', Msg} = Input,
- GS2State = #gs2_state { prioritise_cast = PC }) ->
- in(Input, PC(Msg, GS2State), GS2State);
+ GS2State = #gs2_state { prioritisers = {_, F, _} }) ->
+ in(Input, F(Msg, GS2State), GS2State);
in({'$gen_call', From, Msg} = Input,
- GS2State = #gs2_state { prioritise_call = PC }) ->
- in(Input, PC(Msg, From, GS2State), GS2State);
+ GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
+ in(Input, F(Msg, From, GS2State), GS2State);
in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
in(Input, infinity, GS2State);
in({system, _From, _Req} = Input, GS2State) ->
in(Input, infinity, GS2State);
-in(Input, GS2State = #gs2_state { prioritise_info = PI }) ->
- in(Input, PI(Input, GS2State), GS2State).
+in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
+ in(Input, F(Input, GS2State), GS2State).
in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
@@ -864,13 +863,19 @@ dispatch(Info, Mod, State) ->
common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
reply(From, Reply),
[];
-common_reply(Name, From, Reply, NState, Debug) ->
- reply(Name, From, Reply, NState, Debug).
+common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) ->
+ reply(From, Reply),
+ sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
+
+common_noreply(_Name, _NState, [] = _Debug) ->
+ [];
+common_noreply(Name, NState, Debug) ->
+ sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
-common_debug([] = _Debug, _Func, _Info, _Event) ->
+common_become(_Name, _Mod, _NState, [] = _Debug) ->
[];
-common_debug(Debug, Func, Info, Event) ->
- sys:handle_debug(Debug, Func, Info, Event).
+common_become(Name, Mod, NState, Debug) ->
+ sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
state = State,
@@ -887,23 +892,11 @@ handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1});
- {noreply, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state {state = NState,
- time = infinity,
- debug = Debug1});
- {noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state {state = NState,
- time = Time1,
- debug = Debug1});
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Msg,
GS2State #gs2_state { state = NState })),
- reply(Name, From, Reply, NState, Debug),
+ common_reply(Name, From, Reply, NState, Debug),
exit(R);
Other ->
handle_common_reply(Other, Msg, GS2State)
@@ -916,28 +909,24 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
debug = Debug}) ->
case Reply of
{noreply, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state { state = NState,
- time = infinity,
- debug = Debug1 });
+ Debug1 = common_noreply(Name, NState, Debug),
+ loop(GS2State #gs2_state {state = NState,
+ time = infinity,
+ debug = Debug1});
{noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state { state = NState,
- time = Time1,
- debug = Debug1 });
+ Debug1 = common_noreply(Name, NState, Debug),
+ loop(GS2State #gs2_state {state = NState,
+ time = Time1,
+ debug = Debug1});
{become, Mod, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {become, Mod, NState}),
+ Debug1 = common_become(Name, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
time = infinity,
debug = Debug1 }));
{become, Mod, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {become, Mod, NState}),
+ Debug1 = common_become(Name, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
@@ -957,12 +946,6 @@ handle_common_termination(Reply, Msg, GS2State) ->
terminate({bad_return_value, Reply}, Msg, GS2State)
end.
-reply(Name, {To, Tag}, Reply, State, Debug) ->
- reply({To, Tag}, Reply),
- sys:handle_debug(
- Debug, fun print_event/3, Name, {out, Reply, To, State}).
-
-
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
@@ -1165,16 +1148,13 @@ whereis_name(Name) ->
end.
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
- PrioriCall = function_exported_or_default(
- Mod, 'prioritise_call', 3,
- fun (_Msg, _From, _State) -> 0 end),
- PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
- fun (_Msg, _State) -> 0 end),
- PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
- fun (_Msg, _State) -> 0 end),
- GS2State #gs2_state { prioritise_call = PrioriCall,
- prioritise_cast = PrioriCast,
- prioritise_info = PrioriInfo }.
+ PCall = function_exported_or_default(Mod, 'prioritise_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
diff --git a/src/pmon.erl b/src/pmon.erl
index 54c3fc34..ed32b8b2 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -19,6 +19,8 @@
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
monitored/1, is_empty/1]).
+-compile({no_auto_import, [monitor/2]}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
@@ -48,7 +50,9 @@ monitor(Item, M) ->
false -> dict:store(Item, erlang:monitor(process, Item), M)
end.
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], M) -> M; %% optimisation
+monitor_all([Item], M) -> monitor(Item, M); %% optimisation
+monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
demonitor(Item, M) ->
case dict:find(Item, M) of
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fa8dc652..6b730fda 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -258,16 +258,28 @@
%%----------------------------------------------------------------------------
+%% HiPE compilation happens before we have log handlers - so we have
+%% to io:format/2, it's all we can do.
+
maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
Can = code:which(hipe) =/= non_existing,
case {Want, Can} of
- {true, true} -> hipe_compile();
- {true, false} -> io:format("Not HiPE compiling: HiPE not found in "
- "this Erlang installation.~n");
- {false, _} -> ok
+ {true, true} -> hipe_compile(),
+ true;
+ {true, false} -> false;
+ {false, _} -> true
end.
+warn_if_hipe_compilation_failed(true) ->
+ ok;
+warn_if_hipe_compilation_failed(false) ->
+ error_logger:warning_msg(
+ "Not HiPE compiling: HiPE not found in this Erlang installation.~n").
+
+%% HiPE compilation happens before we have log handlers and can take a
+%% long time, so make an exception to our no-stdout policy and display
+%% progress via stdout.
hipe_compile() ->
Count = length(?HIPE_WORTHY),
io:format("~nHiPE compiling: |~s|~n |",
@@ -311,14 +323,15 @@ start() ->
rabbit_mnesia:check_cluster_consistency(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
- ok = print_plugin_info(rabbit_plugins:active())
+ ok = log_broker_started(rabbit_plugins:active())
end).
boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
- maybe_hipe_compile(),
+ Success = maybe_hipe_compile(),
ok = ensure_working_log_handlers(),
+ warn_if_hipe_compilation_failed(Success),
rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
@@ -332,7 +345,7 @@ boot() ->
false),
ok = app_utils:start_applications(
StartupApps, fun handle_app_error/2),
- ok = print_plugin_info(Plugins)
+ ok = log_broker_started(Plugins)
end).
handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
@@ -400,13 +413,11 @@ status() ->
is_running() -> is_running(node()).
-is_running(Node) ->
- rabbit_nodes:is_running(Node, rabbit).
+is_running(Node) -> rabbit_nodes:is_running(Node, rabbit).
environment() ->
- lists:keysort(
- 1, [P || P = {K, _} <- application:get_all_env(rabbit),
- K =/= default_pass]).
+ lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit),
+ K =/= default_pass]).
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
@@ -429,8 +440,8 @@ start(normal, []) ->
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
+ log_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
- io:format("~nbroker running~n"),
{ok, SupPid};
Error ->
Error
@@ -459,22 +470,16 @@ app_shutdown_order() ->
%%---------------------------------------------------------------------------
%% boot step logic
-run_boot_step({StepName, Attributes}) ->
- Description = case lists:keysearch(description, 1, Attributes) of
- {value, {_, D}} -> D;
- false -> StepName
- end,
+run_boot_step({_StepName, Attributes}) ->
case [MFA || {mfa, MFA} <- Attributes] of
[] ->
- io:format("-- ~s~n", [Description]);
+ ok;
MFAs ->
- io:format("starting ~-60s ...", [Description]),
[try
apply(M,F,A)
catch
_:Reason -> boot_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
- io:format("done~n"),
ok
end.
@@ -535,6 +540,9 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+-ifdef(use_specs).
+-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()).
+-endif.
boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
@@ -554,13 +562,15 @@ boot_error(Reason, Stacktrace) ->
Args = [Reason, log_location(kernel), log_location(sasl)],
boot_error(Reason, Fmt, Args, Stacktrace).
+-ifdef(use_specs).
+-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()])
+ -> no_return()).
+-endif.
+boot_error(Reason, Fmt, Args, not_available) ->
+ basic_boot_error(Reason, Fmt, Args);
boot_error(Reason, Fmt, Args, Stacktrace) ->
- case Stacktrace of
- not_available -> basic_boot_error(Reason, Fmt, Args);
- _ -> basic_boot_error(Reason, Fmt ++
- "Stack trace:~n ~p~n~n",
- Args ++ [Stacktrace])
- end.
+ basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n",
+ Args ++ [Stacktrace]).
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
@@ -686,24 +696,15 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
%% misc
-print_plugin_info([]) ->
- ok;
-print_plugin_info(Plugins) ->
- %% This gets invoked by rabbitmqctl start_app, outside the context
- %% of the rabbit application
+log_broker_started(Plugins) ->
rabbit_misc:with_local_io(
fun() ->
- io:format("~n-- plugins running~n"),
- [print_plugin_info(
- AppName, element(2, application:get_key(AppName, vsn)))
- || AppName <- Plugins],
- ok
+ error_logger:info_msg(
+ "Server startup complete; plugins are: ~p~n", [Plugins]),
+ io:format("~n Broker running with ~p plugins.~n",
+ [length(Plugins)])
end).
-print_plugin_info(Plugin, Vsn) ->
- Len = 76 - length(Vsn),
- io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]).
-
erts_version_check() ->
FoundVer = erlang:system_info(version),
case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
@@ -715,49 +716,44 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- ProductLen = string:len(Product),
- io:format("~n"
- "+---+ +---+~n"
- "| | | |~n"
- "| | | |~n"
- "| | | |~n"
- "| +---+ +-------+~n"
- "| |~n"
- "| ~s +---+ |~n"
- "| | | |~n"
- "| ~s +---+ |~n"
- "| |~n"
- "+-------------------+~n"
- "~s~n~s~n~s~n~n",
- [Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION,
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ io:format("~n## ## ~s ~s. ~s"
+ "~n## ## ~s"
+ "~n##########"
+ "~n###### ## Logs: ~s"
+ "~n########## ~s~n",
+ [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
+ log_location(kernel), log_location(sasl)]).
+
+log_banner() ->
+ {ok, Product} = application:get_key(id),
+ {ok, Version} = application:get_key(vsn),
Settings = [{"node", node()},
- {"app descriptor", app_location()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
{"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
- {"erlang version", erlang:system_info(version)}],
+ {"erlang version", erlang:system_info(otp_release)}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = fun (K, V) ->
- io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
- [K, V])
+ rabbit_misc:format(
+ "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V])
end,
- lists:foreach(fun ({"config file(s)" = K, []}) ->
- Format(K, "(none)");
- ({"config file(s)" = K, [V0 | Vs]}) ->
- Format(K, V0), [Format("", V) || V <- Vs];
- ({K, V}) ->
- Format(K, V)
- end, Settings),
- io:nl().
-
-app_location() ->
- {ok, Application} = application:get_application(),
- filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+ Banner = iolist_to_binary(
+ rabbit_misc:format(
+ "~s ~s~n~s~n~s~n",
+ [Product, Version, ?COPYRIGHT_MESSAGE,
+ ?INFORMATION_MESSAGE]) ++
+ [case S of
+ {"config file(s)" = K, []} ->
+ Format(K, "(none)");
+ {"config file(s)" = K, [V0 | Vs]} ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ {K, V} ->
+ Format(K, V)
+ end || S <- Settings]),
+ error_logger:info_msg("~s", [Banner]).
home_dir() ->
case init:get_argument(home) of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 25357b6e..362b11aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -67,9 +67,8 @@ start() ->
stop() -> ok.
-register(Pid, HighMemMFA) ->
- gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA},
- infinity).
+register(Pid, AlertMFA) ->
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
@@ -94,9 +93,9 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, HighMemMFA}, State) ->
+handle_call({register, Pid, AlertMFA}, State) ->
{ok, 0 < dict:size(State#alarms.alarmed_nodes),
- internal_register(Pid, HighMemMFA, State)};
+ internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
{ok, Alarms, State};
@@ -121,8 +120,8 @@ handle_event({node_up, Node}, State) ->
handle_event({node_down, Node}, State) ->
{ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
-handle_event({register, Pid, HighMemMFA}, State) ->
- {ok, internal_register(Pid, HighMemMFA, State)};
+handle_event({register, Pid, AlertMFA}, State) ->
+ {ok, internal_register(Pid, AlertMFA, State)};
handle_event(_Event, State) ->
{ok, State}.
@@ -198,14 +197,14 @@ alert(Alertees, Source, Alert, NodeComparator) ->
end
end, ok, Alertees).
-internal_register(Pid, {M, F, A} = HighMemMFA,
+internal_register(Pid, {M, F, A} = AlertMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
{ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
error -> ok
end,
- NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
+ NewAlertees = dict:store(Pid, AlertMFA, Alertees),
State#alarms{alertees = NewAlertees}.
handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 07895ae3..9228755e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,10 +31,11 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
+ cancel_sync_mirrors/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -156,11 +157,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -173,6 +174,8 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
+-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')).
+-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
-endif.
@@ -257,7 +260,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -302,6 +305,8 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup([]) -> []; %% optimisation
+lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive for reasons explained in rabbit_misc:dirty_read/1.
@@ -380,14 +385,10 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
- rabbit_misc:assert_args_equivalence(
- Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
+ rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
+ [Key || {Key, _Fun} <- args()]).
check_declare_arguments(QueueName, Args) ->
- Checks = [{<<"x-expires">>, fun check_expires_arg/2},
- {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
- {<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
TypeVal -> case Fun(TypeVal, Args) of
@@ -398,9 +399,15 @@ check_declare_arguments(QueueName, Args) ->
[Key, rabbit_misc:rs(QueueName),
Error])
end
- end || {Key, Fun} <- Checks],
+ end || {Key, Fun} <- args()],
ok.
+args() ->
+ [{<<"x-expires">>, fun check_expires_arg/2},
+ {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
+ {<<"x-dead-letter-exchange">>, fun check_string_arg/2},
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -569,7 +576,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -579,8 +586,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -597,10 +603,13 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
+sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -613,10 +622,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
@@ -674,6 +682,8 @@ deliver(Qs, Delivery, _Flow) ->
R -> {routed, [QPid || {QPid, ok} <- R]}
end.
+qpids([]) -> {[], []}; %% optimisation
+qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
qpids(Qs) ->
{MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
{MPidAcc, SPidAcc}) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 49fcf070..fe3a6099 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ status
}).
-record(consumer, {tag, ack_required}).
@@ -85,7 +86,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -93,24 +94,22 @@
messages_unacknowledged,
messages,
consumers,
- active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
- backing_queue_status
+ backing_queue_status,
+ status
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -122,26 +121,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
- State = #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = undefined,
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = pmon:new(),
- dlx = undefined,
- dlx_routing_key = undefined,
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- delayed_stop = undefined,
- queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
- {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
+ {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -150,27 +130,29 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
+ State = init_state(Q),
+ State1 = State#q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ senders = Senders,
+ msg_id_to_channel = MTC},
+ State2 = process_args(State1),
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries).
+
+init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = Senders,
+ senders = pmon:new(),
publish_seqno = 1,
unconfirmed = dtree:empty(),
- delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = MTC},
- State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State1, Deliveries).
+ msg_id_to_channel = gb_trees:empty(),
+ status = running},
+ rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -182,7 +164,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -264,7 +246,7 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
@@ -274,11 +256,16 @@ init_dlx_routing_key(RoutingKey, State) ->
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
- stop_sync_timer(stop_rate_timer(State)),
+ lists:foldl(fun (F, S) -> F(S) end, State,
+ [fun stop_sync_timer/1,
+ fun stop_rate_timer/1,
+ fun stop_expiry_timer/1,
+ fun stop_ttl_timer/1]),
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -308,36 +295,18 @@ backing_queue_module(Q) ->
true -> rabbit_mirror_queue_master
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
- State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #q.sync_timer_ref,
+ ?SYNC_INTERVAL, sync_timeout).
-stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- State;
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{sync_timer_ref = undefined}.
-
-ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- TRef = erlang:send_after(
- ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
- State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State) ->
- State.
+stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #q.sync_timer_ref).
-stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- State;
-stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{rate_timer_ref = undefined}.
+ensure_rate_timer(State) ->
+ rabbit_misc:ensure_timer(State, #q.rate_timer_ref,
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ update_ram_duration).
-stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
- State;
-stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{expiry_timer_ref = undefined}.
+stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref).
%% We wish to expire only when there are no consumers *and* the expiry
%% hasn't been refreshed (by queue.declare or basic.get) for the
@@ -347,11 +316,34 @@ ensure_expiry_timer(State = #q{expires = undefined}) ->
ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
true -> NewState = stop_expiry_timer(State),
- TRef = erlang:send_after(Expires, self(), maybe_expire),
- NewState#q{expiry_timer_ref = TRef};
+ rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref,
+ Expires, maybe_expire);
false -> State
end.
+stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
+ end;
+ensure_ttl_timer(_Expiry, State) ->
+ State.
+
+stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
+
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
@@ -371,7 +363,7 @@ ch_record(ChPid) ->
undefined -> MonitorRef = erlang:monitor(process, ChPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = sets:new(),
+ acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
is_limit_active = false,
@@ -385,9 +377,9 @@ ch_record(ChPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
end,
C.
@@ -470,7 +462,7 @@ deliver_msg_to_consumer(DeliverFun,
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
+ true -> queue:in(AckTag, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -478,11 +470,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
- fetch(AckRequired, State),
+ {Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ drop_expired_msgs(State1),
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -518,25 +509,28 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
- State) ->
- %% fake an 'eventual' confirm from BQ; noop if not needed
+discard(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message{id = MsgId}}, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([MsgId], State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> confirm_messages([MsgId], State)
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State),
+ drop_expired_msgs(State),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -558,15 +552,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -598,9 +592,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -641,7 +635,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -649,11 +644,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
ensure_expiry_timer(State1))}
end
end.
@@ -668,13 +664,8 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() -> consumer_count(fun (_) -> false end).
-
-active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
-
-consumer_count(Exclude) ->
- lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
- not Exclude(C)]).
+consumer_count() ->
+ lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
is_unused(_State) -> consumer_count() == 0.
@@ -692,15 +683,24 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
not_found ->
State;
C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
- ChAckTags, AckTags)}),
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -712,50 +712,66 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{dlx = DLX,
- backing_queue_state = BQS,
- backing_queue = BQ }) ->
+drop_expired_msgs(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun = dead_letter_fun(expired),
- DLXFun(Msgs),
- {Next, BQS2}
- end,
+ {Props, State1} =
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
+ fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
- end, State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(undefined, State) ->
- State;
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
- After = (case Expiry - now_micros() of
- V when V > 0 -> V + 999; %% always fire later
- _ -> 0
- end) div 1000,
- TRef = erlang:send_after(After, self(), drop_expired),
- State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
- ttl_timer_expiry = TExpiry})
- when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
- end;
-ensure_ttl_timer(_Expiry, State) ->
- State.
-
-dead_letter_fun(Reason) ->
- fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
-
-dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = make_dead_letter_msg(Reason, Msg, State),
+ end, State1).
+
+with_dlx(undefined, _With, Without) -> Without();
+with_dlx(DLX, With, Without) -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} -> With(X);
+ {error, not_found} -> Without()
+ end.
+
+dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
+ end, expired, X, State).
+
+dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
+ {ok, Acc1, BQS1}
+ end, rejected, X, State),
+ State1.
+
+dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, Reason,
+ X, RK, SeqNo, QName) of
+ [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
+ QPids -> {AckImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS),
+ {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
+ {Res, State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}}.
+
+dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
@@ -787,12 +803,9 @@ stop(State) -> stop(undefined, noreply, State).
stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
+ {true, noreply} -> {stop, normal, State};
+ {true, _} -> {stop, normal, Reply, State};
+ {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -837,19 +850,16 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
end
end.
-make_dead_letter_msg(Reason,
- Msg = #basic_message{content = Content,
+make_dead_letter_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
+ Reason, DLX, RK, #resource{name = QName}) ->
{DeathRoutingKeys, HeadersFun1} =
- case DlxRoutingKey of
+ case RK of
undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[DlxRoutingKey],
- fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- #resource{name = QName} = qname(State),
TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
@@ -900,14 +910,12 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(active_consumers, _) ->
- active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -925,6 +933,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(status, #q{status = Status}) ->
+ Status;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -946,19 +956,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -1049,27 +1059,26 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_messages(State1)) of
+ case fetch(AckRequired, drop_expired_msgs(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1078,7 +1087,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1093,7 +1102,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1104,7 +1113,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1114,7 +1123,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(From, ok, State1)
@@ -1123,16 +1132,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
+ drop_expired_msgs(ensure_expiry_timer(State)),
+ reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- IsEmpty = BQ:is_empty(BQS),
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
- IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
true -> stop(From, {ok, BQ:len(BQS)}, State)
end;
@@ -1145,14 +1154,45 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call(sync_mirrors, _From,
+ State = #q{backing_queue = rabbit_mirror_queue_master,
+ backing_queue_state = BQS}) ->
+ S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ HandleInfo = fun (Status) ->
+ receive {'$gen_call', From, {info, Items}} ->
+ Infos = infos(Items, State#q{status = Status}),
+ gen_server2:reply(From, {ok, Infos})
+ after 0 ->
+ ok
+ end
+ end,
+ EmitStats = fun (Status) ->
+ rabbit_event:if_enabled(
+ State, #q.stats_timer,
+ fun() -> emit_stats(State#q{status = Status}) end)
+ end,
+ case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, not_mirrored}, State);
+
+%% By definition if we get this message here we do not have to do anything.
+handle_call(cancel_sync_mirrors, _From, State) ->
+ reply({ok, not_syncing}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
@@ -1191,19 +1231,15 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
- noreply(ack(AckTags, ChPid, State));
-
handle_cast({reject, AckTags, false, ChPid}, State) ->
- DLXFun = dead_letter_fun(rejected),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
- end));
+ noreply(with_dlx(
+ State#q.dlx,
+ fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (State1) ->
+ dead_letter_rejected_msgs(
+ AckTags, X, State1)
+ end) end,
+ fun () -> ack(AckTags, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
@@ -1249,31 +1285,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- {AckImmediately, State2} =
- lists:foldl(
- fun({Msg, AckTag},
- {Acks, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}}) ->
- case dead_letter_publish(Msg, Reason, X, State1) of
- [] -> {[AckTag | Acks], State1};
- QPids -> UC1 = dtree:insert(
- SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- {Acks,
- State1#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}}
- end
- end, {[], State}, Msgs),
- cleanup_after_confirm(AckImmediately, State2);
- {error, not_found} ->
- cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
- end;
-
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
@@ -1310,7 +1321,7 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+ noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 44231f7b..2dc1cad3 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -49,7 +49,7 @@
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> rabbit_types:infos()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(lookup_user/1 :: (rabbit_types:username())
-> rabbit_types:ok(rabbit_types:internal_user())
@@ -58,14 +58,14 @@
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
--spec(list_permissions/0 :: () -> rabbit_types:infos()).
+-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> rabbit_types:infos()).
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
- (rabbit_types:username()) -> rabbit_types:infos()).
+ (rabbit_types:username()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
- -> rabbit_types:infos()).
+ -> [rabbit_types:infos()]).
-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 5e13bc58..c2b52a7c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,17 +26,16 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
+-type(drop_result(Ack) ::
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -72,14 +71,18 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
-%% Remove all messages in the queue, but not messages which have been
-%% fetched and are pending acks.
+%% Remove all 'fetchable' messages from the queue, i.e. all messages
+%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state().
+ rabbit_types:message_properties(), boolean(), pid(),
+ state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -124,33 +127,50 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(),
+ rabbit_types:message_properties(),
+ boolean(), A) -> {('stop' | 'cont'), A}),
+ A, state()) -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -210,9 +230,10 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a028602c..052db3a5 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -85,17 +85,19 @@ backing_queue_test(Cmds) ->
%% Commands
-%% Command frequencies are tuned so that queues are normally reasonably
-%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
-%% and purging cause extreme queue lengths, so these have lower probabilities.
-%% Fetches are sufficiently frequent so that commands that need acktags
-%% get decent coverage.
+%% Command frequencies are tuned so that queues are normally
+%% reasonably short, but they may sometimes exceed
+%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue
+%% lengths, so these have lower probabilities. Fetches/drops are
+%% sufficiently frequent so that commands that need acktags get decent
+%% coverage.
command(S) ->
frequency([{10, qc_publish(S)},
{1, qc_publish_delivered(S)},
{1, qc_publish_multiple(S)}, %% very slow
- {15, qc_fetch(S)}, %% needed for ack and requeue
+ {9, qc_fetch(S)}, %% needed for ack and requeue
+ {6, qc_drop(S)}, %%
{15, qc_ack(S)},
{15, qc_requeue(S)},
{3, qc_set_ram_duration_target(S)},
@@ -104,7 +106,8 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
- {1, qc_purge(S)}]).
+ {1, qc_purge(S)},
+ {1, qc_fold(S)}]).
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
@@ -112,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) ->
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
expiry = oneof([undefined | lists:seq(1, 10)])},
- self(), BQ]}.
+ false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
{call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
@@ -124,6 +127,9 @@ qc_publish_delivered(#state{bqstate = BQ}) ->
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
+qc_drop(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drop, [boolean(), BQ]}.
+
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
@@ -141,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -152,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) ->
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
+qc_fold(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}.
+
%% Preconditions
%% Create long queues by only allowing publishing
@@ -173,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
-next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
@@ -217,22 +226,10 @@ next_state(S, Res,
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
- #state{len = Len, messages = Messages, acks = Acks} = S,
- ResultInfo = {call, erlang, element, [1, Res]},
- BQ1 = {call, erlang, element, [2, Res]},
- AckTag = {call, erlang, element, [3, ResultInfo]},
- S1 = S#state{bqstate = BQ1},
- case gb_trees:is_empty(Messages) of
- true -> S1;
- false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
- false ->
- S2
- end
- end;
+ next_state_fetch_and_drop(S, Res, AckReq, 3);
+
+next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) ->
+ next_state_fetch_and_drop(S, Res, AckReq, 2);
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
@@ -265,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [3, Res]},
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
@@ -278,19 +275,38 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
+
+next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1}.
%% Postconditions
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+
+postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgIdFetched, AckTag}, _BQ} ->
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
+ MsgId = eval({call, erlang, element,
+ [?RECORD_INDEX(id, basic_message), Msg]}),
+ MsgIdFetched =:= MsgId andalso
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -313,6 +329,15 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
+postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
+ #state{messages = Messages} = S,
+ {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
+ {stop, Acc};
+ ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
+ FoldFun(Msg, MsgProps, false, Acc)
+ end, {cont, Acc0}, gb_trees:to_list(Messages)),
+ true = Model =:= Res;
+
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -371,6 +396,16 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
+makefoldfun(Size) ->
+ fun (Msg, _MsgProps, Unacked, Acc) ->
+ case {length(Acc) > Size, Unacked} of
+ {false, false} -> {cont, [Msg | Acc]};
+ {false, true} -> {cont, Acc};
+ {true, _} -> {stop, Acc}
+ end
+ end.
+foldacc() -> [].
+
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
@@ -388,6 +423,24 @@ drop_messages(Messages) ->
end
end.
+next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
+ end.
+
-else.
-export([prop_disabled/0]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a715b291..160512a2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,10 +33,10 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx_status, next_tag, unacked_message_q,
- uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ conn_name, limiter, tx, next_tag, unacked_message_q, user,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -64,6 +64,12 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INCR_STATS(Incs, Measure, State),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine -> incr_stats(Incs, Measure);
+ _ -> ok
+ end).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -185,15 +191,13 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
conn_pid = ConnPid,
conn_name = ConnName,
limiter = Limiter,
- tx_status = none,
+ tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
- uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -314,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout}.
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -327,8 +334,10 @@ handle_info(timeout, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- noreply([ensure_stats_timer],
- rabbit_event:reset_stats_timer(State, #ch.stats_timer));
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -336,9 +345,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -368,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> reply(Reply, [], NewState).
-
-reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
-
-reply(Reply, Mask, NewState, Timeout) ->
- {reply, Reply, next_state(Mask, NewState), Timeout}.
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-noreply(NewState) -> noreply([], NewState).
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
-
-noreply(Mask, NewState, Timeout) ->
- {noreply, next_state(Mask, NewState), Timeout}.
-
--define(MASKED_CALL(Fun, Mask, State),
- case lists:member(Fun, Mask) of
- true -> State;
- false -> Fun(State)
- end).
-
-next_state(Mask, State) ->
- State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
- State2 = ?MASKED_CALL(send_confirms, Mask, State1),
- State2.
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -425,8 +419,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+-ifdef(use_specs).
+-spec(precondition_failed/1 :: (string()) -> no_return()).
+-endif.
precondition_failed(Format) -> precondition_failed(Format, []).
+-ifdef(use_specs).
+-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()).
+-endif.
precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
@@ -443,15 +443,13 @@ check_resource_access(User, Resource, Perm) ->
undefined -> [];
Other -> Other
end,
- CacheTail =
- case lists:member(V, Cache) of
- true -> lists:delete(V, Cache);
- false -> ok = rabbit_access_control:check_resource_access(
- User, Resource, Perm),
- lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
- end,
- put(permission_cache, [V | CacheTail]),
- ok.
+ case lists:member(V, Cache) of
+ true -> ok;
+ false -> ok = rabbit_access_control:check_resource_access(
+ User, Resource, Perm),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
+ put(permission_cache, [V | CacheTail])
+ end.
clear_permission_cache() ->
erase(permission_cache),
@@ -530,16 +528,12 @@ check_not_default_exchange(_) ->
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
-%% One, quite reasonable, interpretation of the spec, taken by the
-%% QPid M1 Java client, is that the exclusion of "amq." prefixed names
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
%% only applies on actual creation, and not in the cases where the
-%% entity already exists. This is how we use this function in the code
-%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly
-%% 0-9SP1, making it illegal to attempt to declare an exchange/queue
-%% with an amq.* name when passive=false. So this will need
-%% revisiting.
+%% entity already exists or passive=true.
%%
-%% TODO: enforce other constraints on name. See AMQP JIRA 69.
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
rabbit_misc:protocol_error(
access_refused,
@@ -558,11 +552,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-record_confirm(undefined, _, State) ->
- State;
-record_confirm(MsgSeqNo, XName, State) ->
- record_confirms([{MsgSeqNo, XName}], State).
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -604,8 +593,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
%% while waiting for the reply to a synchronous command, we generally
%% do allow this...except in the case of a pending tx.commit, where
%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx_status = TxStatus})
- when TxStatus =/= none andalso TxStatus =/= in_progress ->
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
rabbit_misc:protocol_error(
channel_error, "unexpected command while processing 'tx.commit'", []);
@@ -619,7 +608,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
- tx_status = TxStatus,
+ tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -633,23 +622,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {TxStatus, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- {noreply,
- case TxStatus of
- none -> deliver_to_queues({Delivery, QNames}, State1);
- in_progress -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ}
- end};
+ DQ = {Delivery, QNames},
+ {noreply, case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = {Msgs1, Acks}}
+ end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
@@ -662,16 +650,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case TxStatus of
- none -> ack(Acked, State1),
- State1;
- in_progress -> State1#ch{uncommitted_acks =
- Acked ++ State1#ch.uncommitted_acks}
- end};
+ {noreply, case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -684,7 +671,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -696,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -735,10 +722,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -822,14 +810,12 @@ handle_method(#'basic.recover_async'{requeue = true},
limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_misc:with_exit_handler(
- OkFun, fun () ->
- rabbit_amqqueue:requeue(
- QPid, MsgIds, self())
- end)
- end, ok, UAMQL),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_misc:with_exit_handler(
+ OkFun,
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1044,34 +1030,34 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
+ {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _,
- State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- ack(TAL, State1),
- lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
- {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
-
-handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ lists:foreach(fun ({ack, A}) -> ack(A, State1);
+ ({Requeue, A}) -> reject(Requeue, A, Limiter)
+ end, lists:reverse(Acks)),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
+
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL}) ->
- TNL1 = lists:append([L || {_, L} <- TNL]),
- UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
+ tx = {_Msgs, Acks}}) ->
+ AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
+ UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
-handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1130,9 +1116,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1215,42 +1204,40 @@ basic_return(#basic_message{exchange_name = ExchangeName,
Content).
reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case TxStatus of
- none ->
- reject(Requeue, Acked, State1#ch.limiter),
- State1;
- in_progress ->
- State1#ch{uncommitted_nacks =
- [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
- end}.
-
+ {noreply, case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end}.
+
+%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, Acked),
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
@@ -1258,40 +1245,61 @@ record_sent(ConsumerTag, AckRequired,
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
+%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {queue:to_list(Q), queue:new()};
+ {lists:reverse(queue:to_list(Q)), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc],
+ case PrefixAcc of
+ [] -> QTail;
+ _ -> queue:join(
+ queue:from_list(lists:reverse(PrefixAcc)),
+ QTail)
+ end};
Multiple ->
collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
- collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
+ collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
- Incs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
- end, [], Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- incr_stats(Incs, ack, State).
-
-new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = []}.
+%% NB: Acked is in youngest-first order
+ack(Acked, State = #ch{queue_names = QNames}) ->
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ ?INCR_STATS(case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count}];
+ error -> []
+ end, ack, State)
+ end, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked).
+
+%% {Msgs, Acks}
+%%
+%% Msgs is a queue.
+%%
+%% Acks looks s.t. like this:
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
+new_tx() -> {queue:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1301,15 +1309,17 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-fold_per_queue(_F, Acc, []) ->
- Acc;
-fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId], Acc);
-fold_per_queue(F, Acc, UAL) ->
+foreach_per_queue(_F, []) ->
+ ok;
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
+%% NB: UAL should be in youngest-first order; the tree values will
+%% then be in oldest-first order
+foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_fold(F, Acc, T).
+ rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1332,65 +1342,93 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
+ ({_, _, _}, Acc) -> Acc + 1
end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
end.
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ msg_seq_no = undefined,
+ mandatory = false},
+ []}, State) -> %% optimisation
+ ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
- incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
+ ?INCR_STATS([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirms([{MsgSeqNo, XName}], State);
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)}.
+ State#ch.unconfirmed)};
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> record_confirms([{MsgSeqNo, XName}], State)
+ end.
send_nacks([], State) ->
State;
-send_nacks(_MXs, State = #ch{state = closing,
- tx_status = none}) -> %% optimisation
+send_nacks(_MXs, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
State;
-send_nacks(MXs, State = #ch{tx_status = none}) ->
+send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
- State#ch{tx_status = failed};
+ State#ch{tx = failed};
send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx_status = failed}).
+ maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
-send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
@@ -1424,7 +1462,12 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
[ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
+
+ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
+
+maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1432,16 +1475,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
true -> complete_tx(State#ch{confirmed = []})
end.
-complete_tx(State = #ch{tx_status = committing}) ->
+complete_tx(State = #ch{tx = committing}) ->
ok = send(#'tx.commit_ok'{}, State),
- State#ch{tx_status = in_progress};
-complete_tx(State = #ch{tx_status = failed}) ->
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
State),
- State1#ch{tx_status = in_progress}.
+ State1#ch{tx = new_tx()}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1450,19 +1493,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
- dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
- queue:len(UAMQ);
-i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
- queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
- length(TAL);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1473,12 +1513,8 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure, State) ->
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [update_measures(Type, Key, Inc, Measure) ||
- {Type, Key, Inc} <- Incs];
- _ -> ok
- end.
+incr_stats(Incs, Measure) ->
+ [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs].
update_measures(Type, Key, Inc, Measure) ->
Measures = case get({Type, Key}) of
@@ -1495,24 +1531,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index cc29e41c..31bc51b8 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,16 +42,11 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
- {ok, ChannelSupSupPid} =
- supervisor2:start_child(
- SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector,
+ [SupPid, Collector,
rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6a00a0cb..f5e70365 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,7 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5]).
+-export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -50,6 +50,8 @@
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
+ {sync_queue, [?VHOST_DEF]},
+ {cancel_sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -159,6 +161,12 @@ start() ->
false -> io:format("...done.~n")
end,
rabbit_misc:quit(0);
+ {ok, Info} ->
+ case Quiet of
+ true -> ok;
+ false -> io:format("...done (~p).~n", [Info])
+ end,
+ rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
PrintInvalidCommandError(),
usage();
@@ -280,6 +288,18 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
rpc_call(Node, rabbit_mnesia, forget_cluster_node,
[ClusterNode, RemoveWhenOffline]);
+action(sync_queue, Node, [Q], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
+ Inform("Synchronising ~s", [rabbit_misc:rs(QName)]),
+ rpc_call(Node, rabbit_control_main, sync_queue, [QName]);
+
+action(cancel_sync_queue, Node, [Q], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
+ Inform("Stopping synchronising ~s", [rabbit_misc:rs(QName)]),
+ rpc_call(Node, rabbit_control_main, cancel_sync_queue, [QName]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
@@ -513,6 +533,16 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
+sync_queue(Q) ->
+ rabbit_amqqueue:with(
+ Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end).
+
+cancel_sync_queue(Q) ->
+ rabbit_amqqueue:with(
+ Q, fun(#amqqueue{pid = QPid}) ->
+ rabbit_amqqueue:cancel_sync_mirrors(QPid)
+ end).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 10f8ceb8..a91a9916 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -110,18 +110,18 @@ ensure_stats_timer(C, P, Msg) ->
stop_stats_timer(C, P) ->
case element(P, C) of
- #state{level = Level, timer = TRef} = State
- when Level =/= none andalso TRef =/= undefined ->
- erlang:cancel_timer(TRef),
- setelement(P, C, State#state{timer = undefined});
+ #state{timer = TRef} = State when TRef =/= undefined ->
+ case erlang:cancel_timer(TRef) of
+ false -> C;
+ _ -> setelement(P, C, State#state{timer = undefined})
+ end;
#state{} ->
C
end.
reset_stats_timer(C, P) ->
case element(P, C) of
- #state{timer = TRef} = State
- when TRef =/= undefined ->
+ #state{timer = TRef} = State when TRef =/= undefined ->
setelement(P, C, State#state{timer = undefined});
#state{} ->
C
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2fba941f..88033f77 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -39,8 +39,7 @@
-spec(recover/0 :: () -> [name()]).
-spec(callback/4::
(rabbit_types:exchange(), fun_name(),
- fun((boolean()) -> non_neg_integer()) | atom(),
- [any()]) -> 'ok').
+ fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
@@ -114,26 +113,19 @@ recover() ->
[XName || #exchange{name = XName} <- Xs].
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
- Serial = fun (Bool) ->
- case Serial0 of
- _ when is_atom(Serial0) -> Serial0;
- _ -> Serial0(Bool)
- end
+ Serial = if is_function(Serial0) -> Serial0;
+ is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
- [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
- || M <- decorators()],
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
+ M <- decorators()],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
serialise_events(X = #exchange{type = Type}) ->
- case [Serialise || M <- decorators(),
- Serialise <- [M:serialise_events(X)],
- Serialise == true] of
- [] -> (type_to_module(Type)):serialise_events();
- _ -> true
- end.
+ lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
Serial = case serialise_events(X) of
@@ -318,22 +310,19 @@ route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {queue:from_list([X]), XName, []}).
-
-route1(Delivery, {WorkList, SeenXs, QNames}) ->
- case queue:out(WorkList) of
- {empty, _WorkList} ->
- lists:usort(QNames);
- {{value, X = #exchange{type = Type}}, WorkList1} ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
- end.
+ route1(Delivery, {[X], XName, []}).
+
+route1(_, {[], _, QNames}) ->
+ lists:usort(QNames);
+route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ DstNames = process_alternate(
+ X, ((type_to_module(Type)):route(X, Delivery))),
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
+ DstNames)).
process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+ Results;
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
@@ -347,23 +336,25 @@ process_route(#resource{kind = exchange} = XName,
Acc;
process_route(#resource{kind = exchange} = XName,
{WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
- {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:from_list([SeenX, XName]), QNames};
+ {cons_if_present(XName, WorkList),
+ gb_sets:from_list([SeenX, XName]), QNames};
process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
- false -> {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:add_element(XName, SeenXs), QNames}
+ false -> {cons_if_present(XName, WorkList),
+ gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
+cons_if_present(XName, L) ->
+ case lookup(XName) of
+ {ok, X} -> [X | L];
+ {error, not_found} -> L
+ end.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index ac6c4b31..4a48a458 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -31,6 +31,10 @@ description() ->
serialise_events() -> false.
+-ifdef(use_specs).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> no_return()).
+-endif.
route(#exchange{name = Name, type = Type}, _) ->
rabbit_misc:protocol_error(
precondition_failed,
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index d98baf2e..6c45deea 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -104,8 +104,6 @@ advance_blocks({B1, B2, B3, B4}, I) ->
B5 = erlang:phash2({B1, I}, 4294967296),
{{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
-blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
-
%% generate a GUID. This function should be used when performance is a
%% priority and predictability is not an issue. Otherwise use
%% gen_secure/0.
@@ -114,14 +112,15 @@ gen() ->
%% time we need a new guid we rotate them producing a new hash
%% with the aid of the counter. Look at the comments in
%% advance_blocks/2 for details.
- {BS, I} = case get(guid) of
- undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
- erlang:md5(term_to_binary(fresh())),
- {{B1,B2,B3,B4}, 0};
- {BS0, I0} -> advance_blocks(BS0, I0)
- end,
- put(guid, {BS, I}),
- blocks_to_binary(BS).
+ case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> = Res =
+ erlang:md5(term_to_binary(fresh())),
+ put(guid, {{B1, B2, B3, B4}, 0}),
+ Res;
+ {BS, I} -> {{B1, B2, B3, B4}, _} = S = advance_blocks(BS, I),
+ put(guid, S),
+ <<B1:32, B2:32, B3:32, B4:32>>
+ end.
%% generate a non-predictable GUID.
%%
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6db6ce9d..bcd4861a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,58 +17,60 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
--export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
+-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
--record(state, { gm,
+-record(state, { name,
+ gm,
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
- ack_msg_id,
known_senders
}).
-ifdef(use_specs).
--export_type([death_fun/0, depth_fun/0]).
+-export_type([death_fun/0, depth_fun/0, stats_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
--type(master_state() :: #state { gm :: pid(),
+-type(stats_fun() :: fun ((any()) -> 'ok')).
+-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
+ gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
- ack_msg_id :: dict(),
known_senders :: set()
}).
--spec(promote_backing_queue_state/7 ::
- (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
- master_state()).
+-spec(promote_backing_queue_state/8 ::
+ (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(),
+ [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(depth_fun/0 :: () -> depth_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
+-spec(sync_mirrors/3 :: (stats_fun(), stats_fun(), master_state()) ->
+ {'ok', master_state()} | {stop, any(), master_state()}).
-endif.
@@ -109,14 +111,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:new() }.
stop_mirroring(State = #state { coordinator = CPid,
@@ -126,6 +127,31 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+sync_mirrors(HandleInfo, EmitStats,
+ State = #state { name = QName,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Log = fun (Fmt, Params) ->
+ rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n",
+ [rabbit_misc:rs(QName) | Params])
+ end,
+ Log("~p messages to synchronise", [BQ:len(BQS)]),
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ Ref = make_ref(),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
+ S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
+ case rabbit_mirror_queue_sync:master_go(
+ Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
+ {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
+ {sync_died, R, BQS1} -> Log("~p", [R]),
+ {ok, S(BQS1)};
+ {already_synced, BQS1} -> {ok, S(BQS1)};
+ {ok, BQS1} -> Log("complete", []),
+ {ok, S(BQS1)}
+ end.
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -135,8 +161,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -147,20 +173,16 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
-
-stop_all_slaves(Reason, #state{gm = GM}) ->
- Info = gm:info(GM),
- Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
- node(Pid) =/= node()],
- MRefs = [erlang:monitor(process, S) || S <- Slaves],
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
+
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ MRefs = [erlang:monitor(process, SPid) || SPid <- SPids],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
- QName = proplists:get_value(group_name, Info),
rabbit_misc:execute_mnesia_transaction(
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
@@ -174,30 +196,29 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
+
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, State = #state { gm = GM,
@@ -220,22 +241,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- set_delivered = SetDelivered,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -267,43 +283,33 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+fetch(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ end}.
+
+drop(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, AckTag} -> drop_one(AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
-
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -312,6 +318,16 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
@@ -399,20 +415,19 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
Len = BQ:len(BQS1),
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
- seen_status = SeenStatus,
+ seen_status = Seen,
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
sender_death_fun() ->
@@ -440,8 +455,25 @@ depth_fun() ->
end)
end.
-maybe_store_acktag(undefined, _MsgId, AM) -> AM;
-maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop_one(AckTag, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+ State.
+
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 222457c6..69a3be2b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -37,18 +37,10 @@
-include("rabbit.hrl").
-%%----------------------------------------------------------------------------
-
-include("gm_specs.hrl").
--ifdef(use_specs).
-%% Shut dialyzer up
--spec(promote_me/2 :: (_, _) -> no_return()).
--endif.
-
%%----------------------------------------------------------------------------
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -79,6 +71,8 @@
depth_delta
}).
+%%----------------------------------------------------------------------------
+
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
@@ -222,6 +216,31 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, Syncer},
+ State = #state { depth_delta = DD,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
+ S = fun({MA, TRefN, BQSN}) ->
+ State1#state{depth_delta = undefined,
+ msg_id_ack = dict:from_list(MA),
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN}
+ end,
+ case rabbit_mirror_queue_sync:slave(
+ DD, Ref, TRef, Syncer, BQ, BQS,
+ fun (BQN, BQSN) ->
+ BQSN1 = update_ram_duration(BQN, BQSN),
+ TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRefN, BQSN1}
+ end) of
+ denied -> noreply(State1);
+ {ok, Res} -> noreply(set_delta(0, S(Res)));
+ {failed, Res} -> noreply(S(Res));
+ {stop, Reason, Res} -> {stop, Reason, S(Res)}
+ end;
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -232,17 +251,13 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+handle_info(update_ram_duration, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = update_ram_duration(BQ, BQS),
%% Don't call noreply/1, we don't want to set timers
{State1, Timeout} = next_state(State #state {
rate_timer_ref = undefined,
- backing_queue_state = BQS2 }),
+ backing_queue_state = BQS1 }),
{noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
@@ -321,7 +336,6 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -332,6 +346,8 @@ prioritise_info(Msg, _State) ->
_ -> 0
end.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -359,6 +375,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
@@ -444,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
+-ifdef(use_specs).
+-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()).
+-endif.
promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
@@ -530,7 +554,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, AckTags, SS, MPids),
+ QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -564,30 +588,18 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
- State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
+ ?SYNC_INTERVAL, sync_timeout).
+
+stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref).
-stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- State;
-stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #state { sync_timer_ref = undefined }.
-
-ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
- self(), update_ram_duration),
- State #state { rate_timer_ref = TRef };
ensure_rate_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #state.rate_timer_ref,
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ update_ram_duration).
-stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- State;
-stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #state { rate_timer_ref = undefined }.
+stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
@@ -699,7 +711,7 @@ process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
@@ -723,8 +735,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -733,21 +744,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -828,6 +824,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+update_ram_duration(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQ:set_ram_duration_target(DesiredDuration, BQS1).
+
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
new file mode 100644
index 00000000..b8cfe4a9
--- /dev/null
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -0,0 +1,260 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_sync).
+
+-include("rabbit.hrl").
+
+-export([master_prepare/3, master_go/7, slave/7]).
+
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
+%% There are three processes around, the master, the syncer and the
+%% slave(s). The syncer is an intermediary, linked to the master in
+%% order to make sure we do not mess with the master's credit flow or
+%% set of monitors.
+%%
+%% Interactions
+%% ------------
+%%
+%% '*' indicates repeating messages. All are standard Erlang messages
+%% except sync_start which is sent over GM to flush out any other
+%% messages that we might have sent that way already. (credit) is the
+%% usual credit_flow bump message every so often.
+%%
+%% Master Syncer Slave(s)
+%% sync_mirrors -> || ||
+%% (from channel) || -- (spawns) --> || ||
+%% || --------- sync_start (over GM) -------> ||
+%% || || <--- sync_ready ---- ||
+%% || || (or) ||
+%% || || <--- sync_deny ----- ||
+%% || <--- ready ---- || ||
+%% || <--- next* ---- || || }
+%% || ---- msg* ----> || || } loop
+%% || || ---- sync_msg* ----> || }
+%% || || <--- (credit)* ----- || }
+%% || <--- next ---- || ||
+%% || ---- done ----> || ||
+%% || || -- sync_complete --> ||
+%% || (Dies) ||
+
+-ifdef(use_specs).
+
+-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
+-type(bq() :: atom()).
+-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
+
+-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_go/7 :: (pid(), reference(), log_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ bq(), bqs()) ->
+ {'already_synced', bqs()} | {'ok', bqs()} |
+ {'shutdown', any(), bqs()} |
+ {'sync_died', any(), bqs()}).
+-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
+ bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
+ 'denied' |
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
+
+-endif.
+
+%% ---------------------------------------------------------------------------
+%% Master
+
+master_prepare(Ref, Log, SPids) ->
+ MPid = self(),
+ spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
+ receive
+ {'EXIT', Syncer, normal} -> {already_synced, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
+ {ready, Syncer} -> EmitStats({syncing, 0}),
+ master_go0(Args, BQ, BQS)
+ end.
+
+master_go0(Args, BQ, BQS) ->
+ case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
+ end, {0, erlang:now()}, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
+ end.
+
+master_send(Msg, MsgProps, Unacked,
+ {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
+ T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
+ erlang:now();
+ false -> Last
+ end,
+ HandleInfo({syncing, I}),
+ receive
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age)
+ after 0 ->
+ ok
+ end,
+ receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
+ {cont, {I + 1, T}};
+ {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
+ {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
+ end.
+
+master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
+ receive
+ {next, Ref} -> stop_syncer(Syncer, {done, Ref}),
+ {ok, BQS};
+ {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
+ end.
+
+stop_syncer(Syncer, Msg) ->
+ unlink(Syncer),
+ Syncer ! Msg,
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end.
+
+%% Master
+%% ---------------------------------------------------------------------------
+%% Syncer
+
+syncer(Ref, Log, MPid, SPids) ->
+ [erlang:monitor(process, SPid) || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ case [SPid || SPid <- SPids,
+ receive
+ {sync_ready, Ref, SPid} -> true;
+ {sync_deny, Ref, SPid} -> false;
+ {'DOWN', _, process, SPid, _} -> false
+ end] of
+ [] -> Log("all slaves already synced", []);
+ SPids1 -> MPid ! {ready, self()},
+ Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
+ syncer_loop(Ref, MPid, SPids1)
+ end.
+
+syncer_loop(Ref, MPid, SPids) ->
+ MPid ! {next, Ref},
+ receive
+ {msg, Ref, Msg, MsgProps, Unacked} ->
+ SPids1 = wait_for_credit(SPids),
+ [begin
+ credit_flow:send(SPid),
+ SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
+ end || SPid <- SPids1],
+ syncer_loop(Ref, MPid, SPids1);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ ok;
+ {done, Ref} ->
+ [SPid ! {sync_complete, Ref} || SPid <- SPids]
+ end.
+
+wait_for_credit(SPids) ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ wait_for_credit(SPids);
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ wait_for_credit(lists:delete(SPid, SPids))
+ end;
+ false -> SPids
+ end.
+
+%% Syncer
+%% ---------------------------------------------------------------------------
+%% Slave
+
+slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
+ Syncer ! {sync_deny, Ref, self()},
+ denied;
+
+slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, Syncer),
+ Syncer ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
+ slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
+
+slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
+ State = {MA, TRef, BQS}) ->
+ receive
+ {'DOWN', MRef, process, Syncer, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue); instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
+ credit_flow:peer_down(Syncer),
+ {failed, {[], TRef, BQS1}};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ slave_sync_loop(Args, State);
+ {sync_complete, Ref} ->
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(Syncer),
+ {ok, State};
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ slave_sync_loop(Args, State);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ slave_sync_loop(Args, {MA, TRef, BQS1});
+ update_ram_duration ->
+ {TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
+ {sync_msg, Ref, Msg, Props, Unacked} ->
+ credit_flow:ack(Syncer),
+ Props1 = Props#message_properties{needs_confirming = false},
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
+ {'EXIT', Parent, Reason} ->
+ {stop, Reason, State};
+ %% If the master throws an exception
+ {'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
+ BQ:delete_and_terminate(Reason, BQS),
+ {stop, Reason, {[], TRef, undefined}}
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21dbaeb5..c36fb147 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,6 +67,8 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
+-export([ensure_timer/4, stop_timer/2]).
+-export([get_parent/0]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -241,7 +243,9 @@
-spec(interval_operation/4 ::
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
-
+-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
+-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(get_parent/0 :: () -> pid()).
-endif.
%%----------------------------------------------------------------------------
@@ -352,13 +356,12 @@ set_table_value(Table, Key, Type, Value) ->
sort_field_table(
lists:keystore(Key, 1, Table, {Key, Type, Value})).
-r(#resource{virtual_host = VHostPath}, Kind, Name)
- when is_binary(Name) ->
+r(#resource{virtual_host = VHostPath}, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
-r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
+r(VHostPath, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name}.
-r(VHostPath, Kind) when is_binary(VHostPath) ->
+r(VHostPath, Kind) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
@@ -1046,3 +1049,53 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
{false, false} -> lists:max([IdealInterval,
round(LastInterval / 1.5)])
end}.
+
+ensure_timer(State, Idx, After, Msg) ->
+ case element(Idx, State) of
+ undefined -> TRef = erlang:send_after(After, self(), Msg),
+ setelement(Idx, State, TRef);
+ _ -> State
+ end.
+
+stop_timer(State, Idx) ->
+ case element(Idx, State) of
+ undefined -> State;
+ TRef -> case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> setelement(Idx, State, undefined)
+ end
+ end.
+
+%% -------------------------------------------------------------------------
+%% Begin copypasta from gen_server2.erl
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid (Parent) -> Parent;
+ [Parent | _] when is_atom(Parent) -> name_to_pid(Parent);
+ _ -> exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined -> case whereis_name(Name) of
+ undefined -> exit(could_not_find_registerd_name);
+ Pid -> Pid
+ end;
+ Pid -> Pid
+ end.
+
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() -> case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true -> Pid
+ end;
+ [] -> undefined
+ end.
+
+%% End copypasta from gen_server2.erl
+%% -------------------------------------------------------------------------
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c7157753..010fd9ac 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -65,7 +65,8 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
@@ -599,19 +600,16 @@ discover_cluster(Nodes) when is_list(Nodes) ->
lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
(Node, {error, _}) -> discover_cluster(Node)
end, {error, no_nodes_provided}, Nodes);
+discover_cluster(Node) when Node == node() ->
+ {error, {cannot_discover_cluster, "Cannot cluster node with itself"}};
discover_cluster(Node) ->
OfflineError =
{error, {cannot_discover_cluster,
"The nodes provided are either offline or not running"}},
- case node() of
- Node -> {error, {cannot_discover_cluster,
- "Cannot cluster node with itself"}};
- _ -> case rpc:call(Node,
- rabbit_mnesia, cluster_status_from_mnesia, []) of
- {badrpc, _Reason} -> OfflineError;
- {error, mnesia_not_running} -> OfflineError;
- {ok, Res} -> {ok, Res}
- end
+ case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
end.
schema_ok_or_move() ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c2d8e06e..13b40a48 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -943,15 +943,12 @@ next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
_ -> {State, 0}
end.
-start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync),
- State #msstate { sync_timer_ref = TRef }.
+start_sync_timer(State) ->
+ rabbit_misc:ensure_timer(State, #msstate.sync_timer_ref,
+ ?SYNC_INTERVAL, sync).
-stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
- State;
-stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #msstate { sync_timer_ref = undefined }.
+stop_sync_timer(State) ->
+ rabbit_misc:stop_timer(State, #msstate.sync_timer_ref).
internal_sync(State = #msstate { current_file_handle = CurHdl,
cref_to_msg_ids = CTM }) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 0a0e51c5..4b6c7538 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -18,7 +18,8 @@
-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info_keys/0,
+ node_listeners/1, register_connection/1, unregister_connection/1,
+ connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
@@ -65,6 +66,8 @@
-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(register_connection/1 :: (pid()) -> ok).
+-spec(unregister_connection/1 :: (pid()) -> ok).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
-spec(connections_local/0 :: () -> [rabbit_types:connection()]).
-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -294,20 +297,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
+register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
+
+unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
+
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() ->
- [Reader ||
- {_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup),
- Reader <- [try
- rabbit_connection_sup:reader(ConnSup)
- catch exit:{noproc, _} ->
- noproc
- end],
- Reader =/= noproc].
+connections_local() -> pg_local:get_members(rabbit_connections).
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index c0b11799..71c2c80a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -53,7 +53,7 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+-spec(partitions/0 :: () -> {node(), [node()]}).
-endif.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index bce4b43e..58c906eb 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -64,8 +64,8 @@ list(PluginsDir) ->
[plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]),
case Problems of
[] -> ok;
- _ -> io:format("Warning: Problem reading some plugins: ~p~n",
- [Problems])
+ _ -> error_logger:warning_msg(
+ "Problem reading some plugins: ~p~n", [Problems])
end,
Plugins.
@@ -112,8 +112,9 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
case Enabled -- plugin_names(ToUnpackPlugins) of
[] -> ok;
- Missing -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
+ Missing -> error_logger:warning_msg(
+ "The following enabled plugins were not found: ~p~n",
+ [Missing])
end,
%% Eliminate the contents of the destination directory
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 07117f9a..27ed4722 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -618,12 +618,12 @@ add_to_journal(RelSeq, Action, JEntries) ->
del -> {no_pub, del, no_ack};
ack -> {no_pub, no_del, ack}
end, JEntries);
- ({?PUB, del, no_ack}) when Action == ack ->
- array:reset(RelSeq, JEntries);
- ({Pub, no_del, no_ack}) when Action == del ->
- array:set(RelSeq, {Pub, del, no_ack}, JEntries);
- ({Pub, Del, no_ack}) when Action == ack ->
- array:set(RelSeq, {Pub, Del, ack}, JEntries)
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ ({no_pub, del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ ({?PUB, del, no_ack}) when Action == ack ->
+ array:reset(RelSeq, JEntries)
end.
maybe_flush_journal(State = #qistate { dirty_count = DCount,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3a517677..af7aac6f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -35,12 +35,17 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at, host, peer_host,
- port, peer_port}).
+ conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ buf, buf_len, throttle}).
+
+-record(connection, {name, host, peer_host, port, peer_port,
+ protocol, user, timeout_sec, frame_max, vhost,
+ client_properties, capabilities,
+ auth_mechanism, auth_state}).
+
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -60,6 +65,10 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
+-define(IS_STOPPING(State),
+ (State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed)).
+
%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -101,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ConnSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -195,7 +204,7 @@ name(Sock) ->
socket_ends(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = name(Sock),
@@ -205,39 +214,42 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
- name = list_to_binary(Name),
connection = #connection{
+ name = list_to_binary(Name),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
- capabilities = []},
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- channel_sup_sup_pid = ChannelSupSupPid,
+ conn_sup_pid = ConnSupPid,
+ channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- auth_mechanism = none,
- auth_state = none,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never,
- host = Host,
- peer_host = PeerHost,
- port = Port,
- peer_port = PeerPort},
+ throttle = #throttle{
+ conserve_resources = false,
+ last_blocked_by = none,
+ last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -254,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% accounting as accurate as possible we ought to close the
%% socket w/o delay before termination.
rabbit_net:fast_close(ClientSock),
+ rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -288,8 +306,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb,
+ State = #v1{throttle = Throttle}) ->
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -316,9 +336,7 @@ handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
handle_other(terminate_connection, _Deb, State) ->
State;
handle_other(handshake_timeout, Deb, State)
- when ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
+ when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
mainloop(Deb, State);
handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
@@ -372,29 +390,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case {CS, (Throttle#throttle.conserve_resources orelse
+ credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
+ Throttle#throttle{last_blocked_by = resource};
+update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
+ Throttle#throttle{last_blocked_by = flow}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -531,9 +551,10 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, name = Name, queue_collector = Collector,
+ #v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
+ connection = #connection{name = Name,
+ protocol = Protocol,
frame_max = FrameMax,
user = User,
vhost = VHost,
@@ -562,17 +583,13 @@ all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
%%--------------------------------------------------------------------------
handle_frame(Type, 0, Payload,
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol}})
- when CS =:= closing; CS =:= closed ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_STOPPING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
end;
-handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
- State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
@@ -590,44 +607,42 @@ handle_frame(Type, Channel, Payload,
heartbeat -> unexpected_frame(Type, Channel, Payload, State);
Frame -> process_frame(Frame, Channel, State)
end;
+handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
+ State;
handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- {ChPid, AState} = case get({channel, Channel}) of
+ ChKey = {channel, Channel},
+ {ChPid, AState} = case get(ChKey) of
undefined -> create_channel(Channel, State);
Other -> Other
end,
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end.
-
-process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- control_throttle(State);
-post_process_frame({method, MethodName, _}, _ChPid,
- State = #v1{connection = #connection{
- protocol = Protocol}}) ->
- case Protocol:method_has_content(MethodName) of
- true -> erlang:bump_reductions(2000),
- maybe_block(control_throttle(State));
- false -> control_throttle(State)
- end;
+ State;
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
post_process_frame(_Frame, _ChPid, State) ->
- control_throttle(State).
+ State.
%%--------------------------------------------------------------------------
@@ -683,8 +698,12 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+ become_1_0(Id, State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, A, B, C, D});
+ refuse_connection(Sock, {bad_version, {A, B, C, D}});
handle_input(handshake, Other, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
@@ -698,6 +717,7 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
@@ -711,10 +731,13 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
connection_state = starting},
frame_header, 7).
-refuse_connection(Sock, Exception) ->
- ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+refuse_connection(Sock, Exception, {A, B, C, D}) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
throw(Exception).
+refuse_connection(Sock, Exception) ->
+ refuse_connection(Sock, Exception, {0, 0, 9, 1}).
+
ensure_stats_timer(State = #v1{connection_state = running}) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
@@ -746,13 +769,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
- State = State0#v1{auth_mechanism = AuthMechanism,
- auth_state = AuthMechanism:init(Sock),
- connection_state = securing,
+ State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
- capabilities = Capabilities}},
+ capabilities = Capabilities,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -790,18 +813,27 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ conn_sup_pid = ConnSupPid,
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ ConnSupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- conserve_resources = Conserve}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -812,10 +844,9 @@ handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol},
+ State = #v1{connection = #connection{protocol = Protocol},
sock = Sock})
- when CS =:= closing; CS =:= closed ->
+ when ?IS_STOPPING(State) ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
@@ -824,8 +855,7 @@ handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
State;
-handle_method0(_Method, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
+handle_method0(_Method, State) when ?IS_STOPPING(State) ->
State;
handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
@@ -870,10 +900,10 @@ auth_mechanisms_binary(Sock) ->
string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
- State = #v1{auth_mechanism = AuthMechanism,
- auth_state = AuthState,
- connection = Connection =
- #connection{protocol = Protocol},
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
@@ -886,14 +916,16 @@ auth_phase(Response,
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{auth_state = AuthState1};
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}}
+ connection = Connection#connection{user = User,
+ auth_state = none}}
end.
%%--------------------------------------------------------------------------
@@ -901,11 +933,6 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) -> self();
-i(name, #v1{name = Name}) -> Name;
-i(host, #v1{host = Host}) -> Host;
-i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
-i(port, #v1{port = Port}) -> Port;
-i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -922,36 +949,32 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(user, #v1{connection = #connection{user = none}}) ->
- '';
-i(user, #v1{connection = #connection{user = #user{
- username = Username}}}) ->
- Username;
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
- VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
- Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
- FrameMax;
-i(client_properties, #v1{connection = #connection{client_properties =
- ClientProperties}}) ->
- ClientProperties;
-i(Item, #v1{}) ->
- throw({bad_argument, Item}).
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
@@ -980,3 +1003,33 @@ cert_info(F, #v1{sock = Sock}) ->
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+-ifdef(use_specs).
+-spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()).
+-endif.
+become_1_0(Id, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
+ _ -> Mode = case Id of
+ 0 -> amqp;
+ 3 -> sasl;
+ _ -> refuse_connection(
+ Sock, {unsupported_amqp1_0_protocol_id, Id},
+ {3, 1, 0, 0})
+ end,
+ throw({become, {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(State)]}})
+ end.
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ conn_sup_pid = ConnSupPid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF,
+ Buf, BufLen}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 87946888..f5ea4fba 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -61,6 +61,7 @@ all_tests() ->
passed = test_runtime_parameters(),
passed = test_policy_validation(),
passed = test_server_status(),
+ passed = test_amqp_connection_refusal(),
passed = test_confirms(),
passed =
do_if_secondary_node(
@@ -1119,11 +1120,9 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
- [#listener{host = H, port = P} | _] =
- [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
- N =:= node()],
-
- {ok, _C} = gen_tcp:connect(H, P, []),
+ {H, P} = find_listener(),
+ {ok, C} = gen_tcp:connect(H, P, []),
+ gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>),
timer:sleep(100),
ok = info_action(list_connections,
rabbit_networking:connection_info_keys(), false),
@@ -1160,6 +1159,25 @@ test_server_status() ->
passed.
+test_amqp_connection_refusal() ->
+ [passed = test_amqp_connection_refusal(V) ||
+ V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]],
+ passed.
+
+test_amqp_connection_refusal(Header) ->
+ {H, P} = find_listener(),
+ {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]),
+ ok = gen_tcp:send(C, Header),
+ {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100),
+ ok = gen_tcp:close(C),
+ passed.
+
+find_listener() ->
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+ {H, P}.
+
test_writer() -> test_writer(none).
test_writer(Pid) ->
@@ -1302,8 +1320,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1327,9 +1344,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1354,31 +1371,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2233,6 +2250,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
+ variable_queue_publish(IsPersistent, 1, Count, PropFun,
+ fun (_N) -> <<>> end, VQ).
+
+variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2241,16 +2262,18 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
- PropFun(N, #message_properties{}), self(), VQN)
- end, VQ, lists:seq(1, Count)).
+ end},
+ PayloadFun(N)),
+ PropFun(N, #message_properties{}), false, self(), VQN)
+ end, VQ, lists:seq(Start, Start + Count - 1)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2316,38 +2339,126 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_drop/1,
fun test_variable_queue_fold_msg_on_disk/1,
- fun test_dropwhile/1,
+ fun test_dropfetchwhile/1,
fun test_dropwhile_varying_ram_duration/1,
+ fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1]],
+ fun test_variable_queue_purge/1,
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_fold/1]],
passed.
-test_variable_queue_requeue(VQ0) ->
- Interval = 50,
- Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
+test_variable_queue_fold(VQ0) ->
+ {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Count = rabbit_variable_queue:depth(VQ1),
+ Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
+ lists:foldl(fun (Cut, VQ2) ->
+ test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
+ end, VQ1, [0, 1, 2, Count div 2,
+ Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
+ {Acc, VQ1} = rabbit_variable_queue:fold(
+ fun (M, _, Pending, A) ->
+ MInt = msg2int(M),
+ Pending = lists:member(MInt, PendingMsgs), %% assert
+ case MInt =< Cut of
+ true -> {cont, [MInt | A]};
+ false -> {stop, A}
+ end
+ end, [], VQ0),
+ Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs),
+ Expected = lists:reverse(Acc), %% assertion
+ VQ1.
+
+msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
+ binary_to_term(list_to_binary(lists:reverse(P))).
+
+ack_subset(AckSeqs, Interval, Rem) ->
+ lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs).
+
+requeue_one_by_one(Acks, VQ) ->
+ lists:foldl(fun (AckTag, VQN) ->
+ {_MsgId, VQM} = rabbit_variable_queue:requeue(
+ [AckTag], VQN),
+ VQM
+ end, VQ, Acks).
+
+%% Create a vq with messages in q1, delta, and q3, and holes (in the
+%% form of pending acks) in the latter two.
+variable_queue_with_holes(VQ0) ->
+ Interval = 64,
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval,
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
- Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
- [Ack | Acc];
- (_, Acc) ->
- Acc
- end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
- VQ5 = lists:foldl(fun (AckTag, VQN) ->
- {_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], VQN),
- VQM
- end, VQ4, Subset),
- VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ VQ2 = variable_queue_publish(
+ false, 1, Count,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
+ Acks = lists:reverse(AcksR),
+ AckSeqs = lists:zip(Acks, Seq),
+ [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] =
+ [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]],
+ %% we requeue in three phases in order to exercise requeuing logic
+ %% in various vq states
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(
+ Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
+ VQ5 = requeue_one_by_one(Subset1, VQ4),
+ %% by now we have some messages (and holes) in delt
+ VQ6 = requeue_one_by_one(Subset2, VQ5),
+ VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6),
+ %% add the q1 tail
+ VQ8 = variable_queue_publish(
+ true, Count + 1, 64,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
+ %% assertions
+ [false = case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ lists:member(K, [q1, delta, q3])],
+ Depth = Count + 64,
+ Depth = rabbit_variable_queue:depth(VQ8),
+ Len = Depth - length(Subset3),
+ Len = rabbit_variable_queue:len(VQ8),
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
+
+test_variable_queue_requeue(VQ0) ->
+ {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Msgs =
+ lists:zip(RequeuedMsgs,
+ lists:duplicate(length(RequeuedMsgs), true)) ++
+ lists:zip(FreshMsgs,
+ lists:duplicate(length(FreshMsgs), false)),
+ VQ2 = lists:foldl(fun ({I, Requeued}, VQa) ->
+ {{M, MRequeued, _}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
+ Requeued = MRequeued, %% assertion
+ I = msg2int(M), %% assertion
VQb
- end, VQ5, lists:reverse(Acks)),
- {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6),
- VQ7.
+ end, VQ1, Msgs),
+ {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
+ VQ3.
+
+test_variable_queue_purge(VQ0) ->
+ LenDepth = fun (VQ) ->
+ {rabbit_variable_queue:len(VQ),
+ rabbit_variable_queue:depth(VQ)}
+ end,
+ VQ1 = variable_queue_publish(false, 10, VQ0),
+ {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
+ {4, VQ3} = rabbit_variable_queue:purge(VQ2),
+ {0, 6} = LenDepth(VQ3),
+ {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
+ {2, 6} = LenDepth(VQ4),
+ VQ5 = rabbit_variable_queue:purge_acks(VQ4),
+ {2, 2} = LenDepth(VQ5),
+ VQ5.
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
@@ -2378,41 +2489,86 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6.
-test_dropwhile(VQ0) ->
+test_drop(VQ0) ->
+ %% start by sending a messages
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ %% drop message with AckRequired = true
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
+ true = AckTag =/= undefinded,
+ %% drop again -> empty
+ {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
+ %% requeue
+ {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
+ %% drop message with AckRequired = false
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
+ VQ5.
+
+test_dropfetchwhile(VQ0) ->
Count = 10,
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
- false, Count,
- fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
+ false, 1, Count,
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% fetch the first 5 messages
+ {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
+ rabbit_variable_queue:fetchwhile(
+ fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
+ {[Msg | MsgAcc], [AckTag | AckAcc]}
+ end, {[], []}, VQ1),
+ true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
+
+ %% requeue them
+ {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2),
%% drop the first 5 messages
- {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
-
- %% fetch five now
- VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {#message_properties{expiry = 6}, VQ4} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (N, VQN) ->
+ {{Msg, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
+ true = msg2int(Msg) == N,
VQM
- end, VQ2, lists:seq(6, Count)),
+ end, VQ4, lists:seq(6, Count)),
%% should be empty now
- {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+ true = rabbit_variable_queue:is_empty(VQ5),
- VQ4.
+ VQ5.
test_dropwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun (_) -> false end, VQ1),
+ VQ2
+ end, VQ0).
+
+test_fetchwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
+ fun (_) -> false end,
+ fun (_, _, A) -> A end,
+ ok, VQ1),
+ VQ2
+ end, VQ0).
+
+test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ VQ3 = Fun(VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, undefined, VQ6} =
- rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6 = Fun(VQ5),
VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
@@ -2447,7 +2603,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2512,8 +2669,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2535,7 +2692,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
@@ -2559,10 +2717,11 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 59e53be7..432055d4 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -31,9 +31,9 @@
-type(state() :: rabbit_types:exchange() | 'none').
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
--spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -43,26 +43,26 @@
%%----------------------------------------------------------------------------
init(VHost) ->
- case tracing(VHost) of
+ case enabled(VHost) of
false -> none;
true -> {ok, X} = rabbit_exchange:lookup(
rabbit_misc:r(VHost, exchange, ?XNAME)),
X
end.
-tracing(VHost) ->
+enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
- TraceX) ->
- maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
+tap_in(_Msg, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName, []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
- TraceX) ->
+tap_out(_Msg, none) -> ok;
+tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ trace(TraceX, Msg, <<"deliver">>, QName,
+ [{<<"redelivered">>, signedint, RedeliveredNum}]).
%%----------------------------------------------------------------------------
@@ -83,14 +83,11 @@ update_config(Fun) ->
%%----------------------------------------------------------------------------
-maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) ->
+trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
+ _RKPrefix, _RKSuffix, _Extra) ->
ok;
-maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
- _RKPrefix, _RKSuffix, _Extra) ->
- ok;
-maybe_trace(X, Msg = #basic_message{content = #content{
- payload_fragments_rev = PFR}},
- RKPrefix, RKSuffix, Extra) ->
+trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
+ RKPrefix, RKSuffix, Extra) ->
{ok, _, _} = rabbit_basic:publish(
X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
#'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1acc9ef0..faa1b0b1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,12 +16,13 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
+ publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -254,9 +255,8 @@
q3,
q4,
next_seq_id,
- pending_ack,
- pending_ack_index,
- ram_ack_index,
+ ram_pending_ack,
+ disk_pending_ack,
index_state,
msg_store_clients,
durable,
@@ -348,8 +348,8 @@
q3 :: ?QUEUE:?QUEUE(),
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
- pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_pending_ack :: gb_tree(),
+ disk_pending_ack :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -519,18 +519,19 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -538,12 +539,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 })).
+ a(reduce_memory_use(
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -557,8 +558,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = true },
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
@@ -579,27 +579,28 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = remove(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {Msg, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -610,9 +611,18 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
- {Res, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ end.
+
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
ack([], State) ->
@@ -638,16 +648,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
- State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -669,12 +669,28 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(fun(SeqId, {Acc0, State0}) ->
+ MsgStatus = lookup_pending_ack(SeqId, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { pending_ack = Ack }) ->
- len(State) + gb_trees:size(Ack).
+depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -711,7 +727,7 @@ ram_duration(State = #vqstate {
ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
@@ -722,7 +738,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_trees:size(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -754,19 +770,24 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { index_state = IndexState }) ->
+needs_timeout(State = #vqstate { index_state = IndexState,
+ target_ram_count = TargetRamCount }) ->
case must_sync_index(State) of
true -> timed;
false ->
case rabbit_queue_index:needs_sync(IndexState) of
true -> idle;
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
+ false -> case TargetRamCount of
+ infinity -> false;
+ _ -> case
+ reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end
end
end.
@@ -782,8 +803,8 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
@@ -798,10 +819,10 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(PA)},
+ {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -862,16 +883,28 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a msg_id to the unconfirmed set
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-
-msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties { delivered = Delivered }) ->
- %% TODO would it make sense to remove #msg_status.is_delivered?
- #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Delivered,
- msg_on_disk = false, index_on_disk = false,
- msg_props = MsgProps }.
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message {id = MsgId}, MsgProps) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = false,
+ index_on_disk = false,
+ msg_props = MsgProps}.
+
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps}.
trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
@@ -935,31 +968,21 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case gb_trees:is_defined(SeqId, PA) of
- false ->
- {?QUEUE:in_r(
- m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }), Filtered1),
- Delivers1, Acks1};
- true ->
- Acc
+ false -> case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA)) of
+ false -> {?QUEUE:in_r(m(beta_msg_status(M)),
+ Filtered1),
+ Delivers1, Acks1};
+ true -> Acc
end
end
end, {?QUEUE:new(), [], []}, List),
@@ -1006,8 +1029,8 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = ?QUEUE:new(),
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
- pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1045,9 +1068,11 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1063,35 +1088,35 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
+read_msg(#msg_status{msg = Msg}, State) ->
+ {Msg, State}.
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
+ {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+
+inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + 1}.
+
+remove(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1102,12 +1127,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
- end,
+ IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
+ end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
@@ -1118,16 +1142,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1}}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1224,37 +1246,48 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg_on_disk = MsgOnDisk } = MsgStatus,
- State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI,
- ack_in_counter = AckInCount}) ->
- {AckEntry, RAI1} =
- case MsgOnDisk of
- true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ ack_in_counter = AckInCount}) ->
+ {RPA1, DPA1} =
+ case Msg of
+ undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
+ _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
end,
- State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
- ram_ack_index = RAI1,
- ack_in_counter = AckInCount + 1}.
+ State #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1,
+ ack_in_counter = AckInCount + 1}.
+
+lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, DPA)
+ end.
-remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- {gb_trees:get(SeqId, PA),
- State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
+ {V, State #vqstate { ram_pending_ack = RPA1 }};
+ none -> DPA1 = gb_trees:delete(SeqId, DPA),
+ {gb_trees:get(SeqId, DPA),
+ State #vqstate { disk_pending_ack = DPA1 }}
+ end.
purge_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
+ F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty() },
+
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1351,9 +1384,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
-publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+ {Msg, State1} = read_msg(MsgStatus, State),
+ {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
{#msg_status { msg = Msg} = MsgStatus1,
@@ -1417,6 +1451,82 @@ delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
+ end.
+
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1439,12 +1549,9 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_count = infinity}) ->
- {false, State};
reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
rates = #rates { avg_ingress = AvgIngress,
@@ -1454,8 +1561,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
- TargetRamCount) of
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
@@ -1480,23 +1586,23 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
limit_ram_acks(0, State) ->
{0, State};
-limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:is_empty(RPA) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- gb_trees:get(SeqId, PA),
+ {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { pending_ack = PA1,
- ram_ack_index = RAI1 })
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 })
end.
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
@@ -1562,7 +1668,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
- pending_ack = PA,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1570,10 +1677,10 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case ?QUEUE:len(Q3a) of
0 ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 18742ccf..8d2cbc41 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -123,7 +123,7 @@ with(VHostPath, Thunk) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
-i(tracing, VHost) -> rabbit_trace:tracing(VHost);
+i(tracing, VHost) -> rabbit_trace:enabled(VHost);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).