summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-16 10:56:32 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-16 10:56:32 +0000
commit20d0287cb31e6764e5018e6e18c257090223b252 (patch)
tree0c9dbde5d02cbcc1bef6ca3c976348df7616611c
parent88c37f00b352a8720190ad33efde29d5db165be3 (diff)
parent86f37f1c3cd7102c79880186c7fa5f866f265181 (diff)
downloadrabbitmq-server-20d0287cb31e6764e5018e6e18c257090223b252.tar.gz
Merge bug25393
-rwxr-xr-xcheck_xref14
-rw-r--r--docs/rabbitmqctl.1.xml26
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit_alarm.erl17
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_control_main.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl13
-rw-r--r--src/rabbit_mirror_queue_sync.erl45
-rw-r--r--src/rabbit_reader.erl26
-rw-r--r--src/rabbit_tests.erl111
-rw-r--r--src/rabbit_variable_queue.erl111
12 files changed, 283 insertions, 141 deletions
diff --git a/check_xref b/check_xref
index 8f65f3b1..ea0102ee 100755
--- a/check_xref
+++ b/check_xref
@@ -50,6 +50,7 @@ shutdown(Rc, LibDir) ->
check(Cwd, PluginsDir, LibDir, Checks) ->
{ok, Plugins} = file:list_dir(PluginsDir),
ok = file:make_dir(LibDir),
+ put({?MODULE, third_party}, []),
[begin
Source = filename:join(PluginsDir, Plugin),
Target = filename:join(LibDir, Plugin),
@@ -162,7 +163,8 @@ filters() ->
filter_chain(FnChain) ->
fun(AnalysisResult) ->
- lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult));
+ Result = cleanup(AnalysisResult),
+ lists:foldl(fun(F, false) -> F(Result);
(_F, true) -> true
end, false, FnChain)
end.
@@ -267,14 +269,8 @@ source_file(M) ->
store_third_party(App) ->
{ok, AppConfig} = application:get_all_key(App),
- case get({?MODULE, third_party}) of
- undefined ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig));
- Modules ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig) ++ Modules)
- end.
+ AppModules = proplists:get_value(modules, AppConfig),
+ put({?MODULE, third_party}, AppModules ++ get({?MODULE, third_party})).
%% TODO: this ought not to be maintained in such a fashion
external_dependency(Path) ->
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a95f7b3d..c7069aed 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -476,6 +476,26 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to cancel synchronisation for.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a synchronising mirrored queue to stop
+ synchronising itself.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
@@ -1139,6 +1159,12 @@
i.e. those which could take over from the master without
message loss.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>status</term>
+ <listitem><para>The status of the queue. Normally
+ 'running', but may be "{syncing, MsgCount}" if the queue is
+ synchronising.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index dc55948b..20de79c2 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -863,7 +863,7 @@ dispatch(Info, Mod, State) ->
common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
reply(From, Reply),
[];
-common_reply(Name, {To, Tag} = From, Reply, NState, Debug) ->
+common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) ->
reply(From, Reply),
sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d7d4d82a..f813eab8 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -67,9 +67,8 @@ start() ->
stop() -> ok.
-register(Pid, HighMemMFA) ->
- gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA},
- infinity).
+register(Pid, AlertMFA) ->
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
@@ -94,9 +93,9 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, HighMemMFA}, State) ->
+handle_call({register, Pid, AlertMFA}, State) ->
{ok, 0 < dict:size(State#alarms.alarmed_nodes),
- internal_register(Pid, HighMemMFA, State)};
+ internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
{ok, Alarms, State};
@@ -121,8 +120,8 @@ handle_event({node_up, Node}, State) ->
handle_event({node_down, Node}, State) ->
{ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
-handle_event({register, Pid, HighMemMFA}, State) ->
- {ok, internal_register(Pid, HighMemMFA, State)};
+handle_event({register, Pid, AlertMFA}, State) ->
+ {ok, internal_register(Pid, AlertMFA, State)};
handle_event(_Event, State) ->
{ok, State}.
@@ -198,14 +197,14 @@ alert(Alertees, Source, Alert, NodeComparator) ->
end
end, ok, Alertees).
-internal_register(Pid, {M, F, A} = HighMemMFA,
+internal_register(Pid, {M, F, A} = AlertMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
{ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
error -> ok
end,
- NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
+ NewAlertees = dict:store(Pid, AlertMFA, Alertees),
State#alarms{alertees = NewAlertees}.
handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 94150f1c..2477b891 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,8 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
+ cancel_sync_mirrors/1]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -175,6 +176,7 @@
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-spec(sync_mirrors/1 :: (pid()) ->
'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
-endif.
@@ -602,7 +604,8 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
-sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2293d001..0a07a005 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ status
}).
-record(consumer, {tag, ack_required}).
@@ -97,7 +98,8 @@
memory,
slave_pids,
synchronised_slave_pids,
- backing_queue_status
+ backing_queue_status,
+ status
]).
-define(CREATION_EVENT_KEYS,
@@ -149,7 +151,8 @@ init_state(Q) ->
publish_seqno = 1,
unconfirmed = dtree:empty(),
queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
+ msg_id_to_channel = gb_trees:empty(),
+ status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -938,6 +941,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(status, #q{status = Status}) ->
+ Status;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1161,8 +1166,22 @@ handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ HandleInfo = fun (Status) ->
+ receive {'$gen_call', From, {info, Items}} ->
+ Infos = infos(Items, State#q{status = Status}),
+ gen_server2:reply(From, {ok, Infos})
+ after 0 ->
+ ok
+ end
+ end,
+ EmitStats = fun (Status) ->
+ rabbit_event:if_enabled(
+ State, #q.stats_timer,
+ fun() -> emit_stats(State#q{status = Status}) end)
+ end,
case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(
+ HandleInfo, EmitStats, BQS) of
{ok, BQS1} -> reply(ok, S(BQS1));
{stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
end;
@@ -1172,6 +1191,10 @@ handle_call(sync_mirrors, _From,
handle_call(sync_mirrors, _From, State) ->
reply({error, not_mirrored}, State);
+%% By definition if we get this message here we do not have to do anything.
+handle_call(cancel_sync_mirrors, _From, State) ->
+ reply({ok, not_syncing}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6ccd5011..fc9c41a4 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,7 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, sync_queue/1]).
+-export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -51,6 +51,7 @@
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
{sync_queue, [?VHOST_DEF]},
+ {cancel_sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -160,6 +161,12 @@ start() ->
false -> io:format("...done.~n")
end,
rabbit_misc:quit(0);
+ {ok, Info} ->
+ case Quiet of
+ true -> ok;
+ false -> io:format("...done (~p).~n", [Info])
+ end,
+ rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
PrintInvalidCommandError(),
usage();
@@ -287,6 +294,12 @@ action(sync_queue, Node, [Q], Opts, Inform) ->
Inform("Synchronising ~s", [rabbit_misc:rs(QName)]),
rpc_call(Node, rabbit_control_main, sync_queue, [QName]);
+action(cancel_sync_queue, Node, [Q], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
+ Inform("Stopping synchronising ~s", [rabbit_misc:rs(QName)]),
+ rpc_call(Node, rabbit_control_main, cancel_sync_queue, [QName]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
@@ -524,6 +537,12 @@ sync_queue(Q) ->
rabbit_amqqueue:with(
Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end).
+cancel_sync_queue(Q) ->
+ rabbit_amqqueue:with(
+ Q, fun(#amqqueue{pid = QPid}) ->
+ rabbit_amqqueue:cancel_sync_mirrors(QPid)
+ end).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0df7ea1c..b5f72cad 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -28,7 +28,7 @@
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
@@ -46,10 +46,11 @@
-ifdef(use_specs).
--export_type([death_fun/0, depth_fun/0]).
+-export_type([death_fun/0, depth_fun/0, stats_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
+-type(stats_fun() :: fun ((any()) -> 'ok')).
-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
gm :: pid(),
coordinator :: pid(),
@@ -68,7 +69,7 @@
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
--spec(sync_mirrors/1 :: (master_state()) ->
+-spec(sync_mirrors/3 :: (stats_fun(), stats_fun(), master_state()) ->
{'ok', master_state()} | {stop, any(), master_state()}).
-endif.
@@ -126,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors(State = #state { name = QName,
+sync_mirrors(HandleInfo, EmitStats,
+ State = #state { name = QName,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -140,7 +142,8 @@ sync_mirrors(State = #state { name = QName,
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
- case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
+ case rabbit_mirror_queue_sync:master_go(
+ Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 508d46e9..f2ab67cd 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/5, slave/7]).
+-export([master_prepare/3, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -59,7 +59,10 @@
-type(bqs() :: any()).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
--spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) ->
+-spec(master_go/7 :: (pid(), reference(), log_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'shutdown', any(), bqs()} |
{'sync_died', any(), bqs()}).
@@ -78,12 +81,13 @@ master_prepare(Ref, Log, SPids) ->
MPid = self(),
spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
-master_go(Syncer, Ref, Log, BQ, BQS) ->
- Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
- {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ {ready, Syncer} -> EmitStats({syncing, 0}),
+ master_go0(Args, BQ, BQS)
end.
master_go0(Args, BQ, BQS) ->
@@ -95,12 +99,15 @@ master_go0(Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
+master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
+ {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
- true -> Log("~p messages", [I]),
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
erlang:now();
false -> Last
end,
+ HandleInfo({syncing, I}),
receive
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age)
@@ -108,24 +115,31 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
ok
end,
receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
{next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
-master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
receive
- {next, Ref} -> unlink(Syncer),
- Syncer ! {done, Ref},
- receive {'EXIT', Syncer, _} -> ok
- after 0 -> ok
- end,
+ {next, Ref} -> stop_syncer(Syncer, {done, Ref}),
{ok, BQS};
{'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
end.
+stop_syncer(Syncer, Msg) ->
+ unlink(Syncer),
+ Syncer ! Msg,
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end.
+
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -157,6 +171,11 @@ syncer_loop(Ref, MPid, SPids) ->
SPid ! {sync_msg, Ref, Msg, MsgProps}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ ok;
{done, Ref} ->
[SPid ! {sync_complete, Ref} || SPid <- SPids]
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 13e8feff..7a28c8a3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -64,6 +64,10 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
+-define(IS_STOPPING(State),
+ (State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed)).
+
%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -323,9 +327,7 @@ handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
handle_other(terminate_connection, _Deb, State) ->
State;
handle_other(handshake_timeout, Deb, State)
- when ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
+ when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
mainloop(Deb, State);
handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
@@ -572,17 +574,13 @@ all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
%%--------------------------------------------------------------------------
handle_frame(Type, 0, Payload,
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol}})
- when CS =:= closing; CS =:= closed ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_STOPPING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
end;
-handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
- State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
@@ -600,6 +598,8 @@ handle_frame(Type, Channel, Payload,
heartbeat -> unexpected_frame(Type, Channel, Payload, State);
Frame -> process_frame(Frame, Channel, State)
end;
+handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
+ State;
handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
@@ -820,10 +820,9 @@ handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol},
+ State = #v1{connection = #connection{protocol = Protocol},
sock = Sock})
- when CS =:= closing; CS =:= closed ->
+ when ?IS_STOPPING(State) ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
@@ -832,8 +831,7 @@ handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
State;
-handle_method0(_Method, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
+handle_method0(_Method, State) when ?IS_STOPPING(State) ->
State;
handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 09ed3d08..b6969d06 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2227,10 +2227,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
- variable_queue_publish(IsPersistent, Count, PropFun,
+ variable_queue_publish(IsPersistent, 1, Count, PropFun,
fun (_N) -> <<>> end, VQ).
-variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
+variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2242,7 +2242,7 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
end},
PayloadFun(N)),
PropFun(N, #message_properties{}), false, self(), VQN)
- end, VQ, lists:seq(1, Count)).
+ end, VQ, lists:seq(Start, Start + Count - 1)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@@ -2327,13 +2327,25 @@ test_variable_queue() ->
passed.
test_variable_queue_fold(VQ0) ->
- Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
+ JustOverTwoSegs = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
- true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ true, 1, JustOverTwoSegs,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ VQ3 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ2),
+ VQ4 = variable_queue_publish(
+ true, JustOverTwoSegs + 1, 64,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ3),
+ [false = case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- rabbit_variable_queue:status(VQ4),
+ lists:member(K, [q1, delta, q3])], %% precondition
+ Count = JustOverTwoSegs + 64,
lists:foldl(
- fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end,
- VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+ fun (Cut, VQ5) -> test_variable_queue_fold(Cut, Count, VQ5) end,
+ VQ4, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
test_variable_queue_fold(Cut, Count, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
@@ -2350,31 +2362,72 @@ test_variable_queue_fold(Cut, Count, VQ0) ->
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
binary_to_term(list_to_binary(lists:reverse(P))).
-test_variable_queue_requeue(VQ0) ->
- Interval = 50,
- Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
+ack_subset(AckSeqs, Interval, Rem) ->
+ lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs).
+
+requeue_one_by_one(Acks, VQ) ->
+ lists:foldl(fun (AckTag, VQN) ->
+ {_MsgId, VQM} = rabbit_variable_queue:requeue(
+ [AckTag], VQN),
+ VQM
+ end, VQ, Acks).
+
+%% Create a vq with messages in q1, delta, and q3, and holes (in the
+%% form of pending acks) in the latter two.
+variable_queue_with_holes(VQ0) ->
+ Interval = 64,
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval,
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
- Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
- [Ack | Acc];
- (_, Acc) ->
- Acc
- end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
- VQ5 = lists:foldl(fun (AckTag, VQN) ->
- {_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], VQN),
- VQM
- end, VQ4, Subset),
- VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag}, VQb} =
+ VQ2 = variable_queue_publish(
+ false, 1, Count,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
+ Acks = lists:reverse(AcksR),
+ AckSeqs = lists:zip(Acks, Seq),
+ [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] =
+ [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]],
+ %% we requeue in three phases in order to exercise requeuing logic
+ %% in various vq states
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(
+ Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
+ VQ5 = requeue_one_by_one(Subset1, VQ4),
+ %% by now we have some messages (and holes) in delt
+ VQ6 = requeue_one_by_one(Subset2, VQ5),
+ VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6),
+ %% add the q1 tail
+ VQ8 = variable_queue_publish(
+ true, Count + 1, 64,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
+ %% assertions
+ [false = case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ lists:member(K, [q1, delta, q3])],
+ Depth = Count + 64,
+ Depth = rabbit_variable_queue:depth(VQ8),
+ Len = Depth - length(Subset3),
+ Len = rabbit_variable_queue:len(VQ8),
+ {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}.
+
+test_variable_queue_requeue(VQ0) ->
+ {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ Msgs =
+ lists:zip(RequeuedMsgs,
+ lists:duplicate(length(RequeuedMsgs), true)) ++
+ lists:zip(FreshMsgs,
+ lists:duplicate(length(FreshMsgs), false)),
+ VQ2 = lists:foldl(fun ({I, Requeued}, VQa) ->
+ {{M, MRequeued, _}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
+ Requeued = MRequeued, %% assertion
+ I = msg2int(M), %% assertion
VQb
- end, VQ5, lists:reverse(Acks)),
- {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6),
- VQ7.
+ end, VQ1, Msgs),
+ {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
+ VQ3.
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
@@ -2426,7 +2479,7 @@ test_dropfetchwhile(VQ0) ->
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
- false, Count,
+ false, 1, Count,
fun (N, Props) -> Props#message_properties{expiry = N} end,
fun erlang:term_to_binary/1, VQ0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 90ee3439..dc32902f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -527,7 +527,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- ram_msg_count = RamMsgCount,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
@@ -538,12 +537,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 })).
+ a(reduce_memory_use(
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -596,7 +595,7 @@ fetchwhile(Pred, Fun, Acc, State) ->
{undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ true -> {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(true, MsgStatus, State2),
fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
@@ -610,7 +609,7 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
{{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
@@ -672,7 +671,7 @@ ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
lists:foldl(fun(SeqId, {Acc0, State0}) ->
MsgStatus = lookup_pending_ack(SeqId, State0),
- {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
{MsgFun(Msg, SeqId, Acc0), State1}
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
@@ -684,7 +683,7 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
{StopGo, AccNext} =
Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
{StopGo, {AccNext, State1}}
@@ -898,11 +897,25 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
- Msg = #basic_message { id = MsgId }, MsgProps) ->
- #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = false, index_on_disk = false,
- msg_props = MsgProps }.
+ Msg = #basic_message {id = MsgId}, MsgProps) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = false,
+ index_on_disk = false,
+ msg_props = MsgProps}.
+
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps}.
trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
@@ -969,7 +982,7 @@ maybe_write_delivered(true, SeqId, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -977,21 +990,10 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
[SeqId | Acks1]};
false -> case (gb_trees:is_defined(SeqId, RPA) orelse
gb_trees:is_defined(SeqId, DPA)) of
- false ->
- {?QUEUE:in_r(
- m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }), Filtered1),
- Delivers1, Acks1};
- true ->
- Acc
+ false -> {?QUEUE:in_r(m(beta_msg_status(M)),
+ Filtered1),
+ Delivers1, Acks1};
+ true -> Acc
end
end
end, {?QUEUE:new(), [], []}, List),
@@ -1078,9 +1080,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, true, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }
+ read_msg(MsgStatus, State),
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1096,19 +1099,19 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(#msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent},
+ State = #vqstate{msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
- {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
- msg_store_clients = MSCState1 }};
-read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State #vqstate {msg_store_clients = MSCState1}};
+read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
+inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + 1}.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1122,7 +1125,7 @@ remove(AckRequired, MsgStatus = #msg_status {
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount }) ->
+ persistent_count = PCount}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1151,11 +1154,11 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1}}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1390,10 +1393,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- {Msg, State1} = read_msg(MsgStatus, true, State),
- {MsgStatus#msg_status { msg = Msg }, State1};
-publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+ {Msg, State1} = read_msg(MsgStatus, State),
+ {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
{#msg_status { msg = Msg} = MsgStatus1,