diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-16 10:56:32 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-16 10:56:32 +0000 |
commit | 20d0287cb31e6764e5018e6e18c257090223b252 (patch) | |
tree | 0c9dbde5d02cbcc1bef6ca3c976348df7616611c | |
parent | 88c37f00b352a8720190ad33efde29d5db165be3 (diff) | |
parent | 86f37f1c3cd7102c79880186c7fa5f866f265181 (diff) | |
download | rabbitmq-server-20d0287cb31e6764e5018e6e18c257090223b252.tar.gz |
Merge bug25393
-rwxr-xr-x | check_xref | 14 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 26 | ||||
-rw-r--r-- | src/gen_server2.erl | 2 | ||||
-rw-r--r-- | src/rabbit_alarm.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 21 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 45 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 26 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 111 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 111 |
12 files changed, 283 insertions, 141 deletions
@@ -50,6 +50,7 @@ shutdown(Rc, LibDir) -> check(Cwd, PluginsDir, LibDir, Checks) -> {ok, Plugins} = file:list_dir(PluginsDir), ok = file:make_dir(LibDir), + put({?MODULE, third_party}, []), [begin Source = filename:join(PluginsDir, Plugin), Target = filename:join(LibDir, Plugin), @@ -162,7 +163,8 @@ filters() -> filter_chain(FnChain) -> fun(AnalysisResult) -> - lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult)); + Result = cleanup(AnalysisResult), + lists:foldl(fun(F, false) -> F(Result); (_F, true) -> true end, false, FnChain) end. @@ -267,14 +269,8 @@ source_file(M) -> store_third_party(App) -> {ok, AppConfig} = application:get_all_key(App), - case get({?MODULE, third_party}) of - undefined -> - put({?MODULE, third_party}, - proplists:get_value(modules, AppConfig)); - Modules -> - put({?MODULE, third_party}, - proplists:get_value(modules, AppConfig) ++ Modules) - end. + AppModules = proplists:get_value(modules, AppConfig), + put({?MODULE, third_party}, AppModules ++ get({?MODULE, third_party})). %% TODO: this ought not to be maintained in such a fashion external_dependency(Path) -> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a95f7b3d..c7069aed 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -476,6 +476,26 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to cancel synchronisation for. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a synchronising mirrored queue to stop + synchronising itself. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> @@ -1139,6 +1159,12 @@ i.e. those which could take over from the master without message loss.</para></listitem> </varlistentry> + <varlistentry> + <term>status</term> + <listitem><para>The status of the queue. Normally + 'running', but may be "{syncing, MsgCount}" if the queue is + synchronising.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>queueinfoitem</command>s are specified then queue name and depth are diff --git a/src/gen_server2.erl b/src/gen_server2.erl index dc55948b..20de79c2 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -863,7 +863,7 @@ dispatch(Info, Mod, State) -> common_reply(_Name, From, Reply, _NState, [] = _Debug) -> reply(From, Reply), []; -common_reply(Name, {To, Tag} = From, Reply, NState, Debug) -> +common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) -> reply(From, Reply), sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d7d4d82a..f813eab8 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -67,9 +67,8 @@ start() -> stop() -> ok. -register(Pid, HighMemMFA) -> - gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA}, - infinity). +register(Pid, AlertMFA) -> + gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}). clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}). @@ -94,9 +93,9 @@ init([]) -> alarmed_nodes = dict:new(), alarms = []}}. -handle_call({register, Pid, HighMemMFA}, State) -> +handle_call({register, Pid, AlertMFA}, State) -> {ok, 0 < dict:size(State#alarms.alarmed_nodes), - internal_register(Pid, HighMemMFA, State)}; + internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> {ok, Alarms, State}; @@ -121,8 +120,8 @@ handle_event({node_up, Node}, State) -> handle_event({node_down, Node}, State) -> {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; -handle_event({register, Pid, HighMemMFA}, State) -> - {ok, internal_register(Pid, HighMemMFA, State)}; +handle_event({register, Pid, AlertMFA}, State) -> + {ok, internal_register(Pid, AlertMFA, State)}; handle_event(_Event, State) -> {ok, State}. @@ -198,14 +197,14 @@ alert(Alertees, Source, Alert, NodeComparator) -> end end, ok, Alertees). -internal_register(Pid, {M, F, A} = HighMemMFA, +internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), case dict:find(node(), State#alarms.alarmed_nodes) of {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources]; error -> ok end, - NewAlertees = dict:store(Pid, HighMemMFA, Alertees), + NewAlertees = dict:store(Pid, AlertMFA, Alertees), State#alarms{alertees = NewAlertees}. handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 94150f1c..2477b891 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,8 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, + cancel_sync_mirrors/1]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -175,6 +176,7 @@ -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). -endif. @@ -602,7 +604,8 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). -sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2293d001..0a07a005 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + status }). -record(consumer, {tag, ack_required}). @@ -97,7 +98,8 @@ memory, slave_pids, synchronised_slave_pids, - backing_queue_status + backing_queue_status, + status ]). -define(CREATION_EVENT_KEYS, @@ -149,7 +151,8 @@ init_state(Q) -> publish_seqno = 1, unconfirmed = dtree:empty(), queue_monitors = pmon:new(), - msg_id_to_channel = gb_trees:empty()}, + msg_id_to_channel = gb_trees:empty(), + status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -938,6 +941,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(status, #q{status = Status}) -> + Status; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1161,8 +1166,22 @@ handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, + HandleInfo = fun (Status) -> + receive {'$gen_call', From, {info, Items}} -> + Infos = infos(Items, State#q{status = Status}), + gen_server2:reply(From, {ok, Infos}) + after 0 -> + ok + end + end, + EmitStats = fun (Status) -> + rabbit_event:if_enabled( + State, #q.stats_timer, + fun() -> emit_stats(State#q{status = Status}) end) + end, case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors( + HandleInfo, EmitStats, BQS) of {ok, BQS1} -> reply(ok, S(BQS1)); {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} end; @@ -1172,6 +1191,10 @@ handle_call(sync_mirrors, _From, handle_call(sync_mirrors, _From, State) -> reply({error, not_mirrored}, State); +%% By definition if we get this message here we do not have to do anything. +handle_call(cancel_sync_mirrors, _From, State) -> + reply({ok, not_syncing}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 6ccd5011..fc9c41a4 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,7 +17,7 @@ -module(rabbit_control_main). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, sync_queue/1]). +-export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -51,6 +51,7 @@ {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, {sync_queue, [?VHOST_DEF]}, + {cancel_sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -160,6 +161,12 @@ start() -> false -> io:format("...done.~n") end, rabbit_misc:quit(0); + {ok, Info} -> + case Quiet of + true -> ok; + false -> io:format("...done (~p).~n", [Info]) + end, + rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15 PrintInvalidCommandError(), usage(); @@ -287,6 +294,12 @@ action(sync_queue, Node, [Q], Opts, Inform) -> Inform("Synchronising ~s", [rabbit_misc:rs(QName)]), rpc_call(Node, rabbit_control_main, sync_queue, [QName]); +action(cancel_sync_queue, Node, [Q], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), + Inform("Stopping synchronising ~s", [rabbit_misc:rs(QName)]), + rpc_call(Node, rabbit_control_main, cancel_sync_queue, [QName]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); @@ -524,6 +537,12 @@ sync_queue(Q) -> rabbit_amqqueue:with( Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end). +cancel_sync_queue(Q) -> + rabbit_amqqueue:with( + Q, fun(#amqqueue{pid = QPid}) -> + rabbit_amqqueue:cancel_sync_mirrors(QPid) + end). + %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Application, Inform) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0df7ea1c..b5f72cad 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -28,7 +28,7 @@ -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). -behaviour(rabbit_backing_queue). @@ -46,10 +46,11 @@ -ifdef(use_specs). --export_type([death_fun/0, depth_fun/0]). +-export_type([death_fun/0, depth_fun/0, stats_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). -type(depth_fun() :: fun (() -> 'ok')). +-type(stats_fun() :: fun ((any()) -> 'ok')). -type(master_state() :: #state { name :: rabbit_amqqueue:name(), gm :: pid(), coordinator :: pid(), @@ -68,7 +69,7 @@ -spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> master_state()). -spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). --spec(sync_mirrors/1 :: (master_state()) -> +-spec(sync_mirrors/3 :: (stats_fun(), stats_fun(), master_state()) -> {'ok', master_state()} | {stop, any(), master_state()}). -endif. @@ -126,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors(State = #state { name = QName, +sync_mirrors(HandleInfo, EmitStats, + State = #state { name = QName, gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -140,7 +142,8 @@ sync_mirrors(State = #state { name = QName, Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, - case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of + case rabbit_mirror_queue_sync:master_go( + Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; {sync_died, R, BQS1} -> Log("~p", [R]), {ok, S(BQS1)}; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 508d46e9..f2ab67cd 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/5, slave/7]). +-export([master_prepare/3, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -59,7 +59,10 @@ -type(bqs() :: any()). -spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). --spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) -> +-spec(master_go/7 :: (pid(), reference(), log_fun(), + rabbit_mirror_queue_master:stats_fun(), + rabbit_mirror_queue_master:stats_fun(), + bq(), bqs()) -> {'already_synced', bqs()} | {'ok', bqs()} | {'shutdown', any(), bqs()} | {'sync_died', any(), bqs()}). @@ -78,12 +81,13 @@ master_prepare(Ref, Log, SPids) -> MPid = self(), spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, BQ, BQS) -> - Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, +master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> + Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; - {ready, Syncer} -> master_go0(Args, BQ, BQS) + {ready, Syncer} -> EmitStats({syncing, 0}), + master_go0(Args, BQ, BQS) end. master_go0(Args, BQ, BQS) -> @@ -95,12 +99,15 @@ master_go0(Args, BQ, BQS) -> {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> +master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, + {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of - true -> Log("~p messages", [I]), + true -> EmitStats({syncing, I}), + Log("~p messages", [I]), erlang:now(); false -> Last end, + HandleInfo({syncing, I}), receive {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age) @@ -108,24 +115,31 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> ok end, receive + {'$gen_call', From, + cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), + gen_server2:reply(From, ok), + {stop, cancelled}; {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. -master_done({Syncer, Ref, _Log, Parent}, BQS) -> +master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) -> receive - {next, Ref} -> unlink(Syncer), - Syncer ! {done, Ref}, - receive {'EXIT', Syncer, _} -> ok - after 0 -> ok - end, + {next, Ref} -> stop_syncer(Syncer, {done, Ref}), {ok, BQS}; {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS} end. +stop_syncer(Syncer, Msg) -> + unlink(Syncer), + Syncer ! Msg, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end. + %% Master %% --------------------------------------------------------------------------- %% Syncer @@ -157,6 +171,11 @@ syncer_loop(Ref, MPid, SPids) -> SPid ! {sync_msg, Ref, Msg, MsgProps} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + ok; {done, Ref} -> [SPid ! {sync_complete, Ref} || SPid <- SPids] end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 13e8feff..7a28c8a3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -64,6 +64,10 @@ State#v1.connection_state =:= blocking orelse State#v1.connection_state =:= blocked)). +-define(IS_STOPPING(State), + (State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed)). + %%-------------------------------------------------------------------------- -ifdef(use_specs). @@ -323,9 +327,7 @@ handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> handle_other(terminate_connection, _Deb, State) -> State; handle_other(handshake_timeout, Deb, State) - when ?IS_RUNNING(State) orelse - State#v1.connection_state =:= closing orelse - State#v1.connection_state =:= closed -> + when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> mainloop(Deb, State); handle_other(handshake_timeout, _Deb, State) -> throw({handshake_timeout, State#v1.callback}); @@ -572,17 +574,13 @@ all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. %%-------------------------------------------------------------------------- handle_frame(Type, 0, Payload, - State = #v1{connection_state = CS, - connection = #connection{protocol = Protocol}}) - when CS =:= closing; CS =:= closed -> + State = #v1{connection = #connection{protocol = Protocol}}) + when ?IS_STOPPING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State end; -handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) - when CS =:= closing; CS =:= closed -> - State; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of @@ -600,6 +598,8 @@ handle_frame(Type, Channel, Payload, heartbeat -> unexpected_frame(Type, Channel, Payload, State); Frame -> process_frame(Frame, Channel, State) end; +handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) -> + State; handle_frame(Type, Channel, Payload, State) -> unexpected_frame(Type, Channel, Payload, State). @@ -820,10 +820,9 @@ handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, - State = #v1{connection_state = CS, - connection = #connection{protocol = Protocol}, + State = #v1{connection = #connection{protocol = Protocol}, sock = Sock}) - when CS =:= closing; CS =:= closed -> + when ?IS_STOPPING(State) -> %% We're already closed or closing, so we don't need to cleanup %% anything. ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), @@ -832,8 +831,7 @@ handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, State; -handle_method0(_Method, State = #v1{connection_state = CS}) - when CS =:= closing; CS =:= closed -> +handle_method0(_Method, State) when ?IS_STOPPING(State) -> State; handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 09ed3d08..b6969d06 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2227,10 +2227,10 @@ variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> - variable_queue_publish(IsPersistent, Count, PropFun, + variable_queue_publish(IsPersistent, 1, Count, PropFun, fun (_N) -> <<>> end, VQ). -variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> +variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> lists:foldl( fun (N, VQN) -> rabbit_variable_queue:publish( @@ -2242,7 +2242,7 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> end}, PayloadFun(N)), PropFun(N, #message_properties{}), false, self(), VQN) - end, VQ, lists:seq(1, Count)). + end, VQ, lists:seq(Start, Start + Count - 1)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> @@ -2327,13 +2327,25 @@ test_variable_queue() -> passed. test_variable_queue_fold(VQ0) -> - Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64, + JustOverTwoSegs = rabbit_queue_index:next_segment_boundary(0) * 2 + 64, VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish( - true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + true, 1, JustOverTwoSegs, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + VQ3 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ2), + VQ4 = variable_queue_publish( + true, JustOverTwoSegs + 1, 64, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ3), + [false = case V of + {delta, _, 0, _} -> true; + 0 -> true; + _ -> false + end || {K, V} <- rabbit_variable_queue:status(VQ4), + lists:member(K, [q1, delta, q3])], %% precondition + Count = JustOverTwoSegs + 64, lists:foldl( - fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end, - VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). + fun (Cut, VQ5) -> test_variable_queue_fold(Cut, Count, VQ5) end, + VQ4, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). test_variable_queue_fold(Cut, Count, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( @@ -2350,31 +2362,72 @@ test_variable_queue_fold(Cut, Count, VQ0) -> msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> binary_to_term(list_to_binary(lists:reverse(P))). -test_variable_queue_requeue(VQ0) -> - Interval = 50, - Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, +ack_subset(AckSeqs, Interval, Rem) -> + lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs). + +requeue_one_by_one(Acks, VQ) -> + lists:foldl(fun (AckTag, VQN) -> + {_MsgId, VQM} = rabbit_variable_queue:requeue( + [AckTag], VQN), + VQM + end, VQ, Acks). + +%% Create a vq with messages in q1, delta, and q3, and holes (in the +%% form of pending acks) in the latter two. +variable_queue_with_holes(VQ0) -> + Interval = 64, + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, Seq = lists:seq(1, Count), VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish(false, Count, VQ1), - {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2), - Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> - [Ack | Acc]; - (_, Acc) -> - Acc - end, [], lists:zip(Acks, Seq)), - {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3), - VQ5 = lists:foldl(fun (AckTag, VQN) -> - {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], VQN), - VQM - end, VQ4, Subset), - VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag}, VQb} = + VQ2 = variable_queue_publish( + false, 1, Count, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), + Acks = lists:reverse(AcksR), + AckSeqs = lists:zip(Acks, Seq), + [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] = + [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]], + %% we requeue in three phases in order to exercise requeuing logic + %% in various vq states + {_MsgIds, VQ4} = rabbit_variable_queue:requeue( + Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), + VQ5 = requeue_one_by_one(Subset1, VQ4), + %% by now we have some messages (and holes) in delt + VQ6 = requeue_one_by_one(Subset2, VQ5), + VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6), + %% add the q1 tail + VQ8 = variable_queue_publish( + true, Count + 1, 64, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), + %% assertions + [false = case V of + {delta, _, 0, _} -> true; + 0 -> true; + _ -> false + end || {K, V} <- rabbit_variable_queue:status(VQ8), + lists:member(K, [q1, delta, q3])], + Depth = Count + 64, + Depth = rabbit_variable_queue:depth(VQ8), + Len = Depth - length(Subset3), + Len = rabbit_variable_queue:len(VQ8), + {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}. + +test_variable_queue_requeue(VQ0) -> + {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + Msgs = + lists:zip(RequeuedMsgs, + lists:duplicate(length(RequeuedMsgs), true)) ++ + lists:zip(FreshMsgs, + lists:duplicate(length(FreshMsgs), false)), + VQ2 = lists:foldl(fun ({I, Requeued}, VQa) -> + {{M, MRequeued, _}, VQb} = rabbit_variable_queue:fetch(true, VQa), + Requeued = MRequeued, %% assertion + I = msg2int(M), %% assertion VQb - end, VQ5, lists:reverse(Acks)), - {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6), - VQ7. + end, VQ1, Msgs), + {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), + VQ3. test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages @@ -2426,7 +2479,7 @@ test_dropfetchwhile(VQ0) -> %% add messages with sequential expiry VQ1 = variable_queue_publish( - false, Count, + false, 1, Count, fun (N, Props) -> Props#message_properties{expiry = N} end, fun erlang:term_to_binary/1, VQ0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90ee3439..dc32902f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -527,7 +527,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), @@ -538,12 +537,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = UC1 })). + a(reduce_memory_use( + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -596,7 +595,7 @@ fetchwhile(Pred, Fun, Acc, State) -> {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> {Msg, State2} = read_msg(MsgStatus, false, State1), + true -> {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(true, MsgStatus, State2), fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} @@ -610,7 +609,7 @@ fetch(AckRequired, State) -> {{value, MsgStatus}, State1} -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, false, State1), + {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(AckRequired, MsgStatus, State2), {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} end. @@ -672,7 +671,7 @@ ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = lists:foldl(fun(SeqId, {Acc0, State0}) -> MsgStatus = lookup_pending_ack(SeqId, State0), - {Msg, State1} = read_msg(MsgStatus, false, State0), + {Msg, State1} = read_msg(MsgStatus, State0), {MsgFun(Msg, SeqId, Acc0), State1} end, {Acc, State}, AckTags), {AccN, a(StateN)}. @@ -684,7 +683,7 @@ fold(Fun, Acc, #vqstate { q1 = Q1, q3 = Q3, q4 = Q4 } = State) -> QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, false, State0), + {Msg, State1} = read_msg(MsgStatus, State0), {StopGo, AccNext} = Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), {StopGo, {AccNext, State1}} @@ -898,11 +897,25 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message { id = MsgId }, MsgProps) -> - #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = false, index_on_disk = false, - msg_props = MsgProps }. + Msg = #basic_message {id = MsgId}, MsgProps) -> + #msg_status{seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = false, + index_on_disk = false, + msg_props = MsgProps}. + +beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> + #msg_status{seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps}. trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. @@ -969,7 +982,7 @@ maybe_write_delivered(true, SeqId, IndexState) -> betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, + fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, {Filtered1, Delivers1, Acks1} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -977,21 +990,10 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> [SeqId | Acks1]}; false -> case (gb_trees:is_defined(SeqId, RPA) orelse gb_trees:is_defined(SeqId, DPA)) of - false -> - {?QUEUE:in_r( - m(#msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }), Filtered1), - Delivers1, Acks1}; - true -> - Acc + false -> {?QUEUE:in_r(m(beta_msg_status(M)), + Filtered1), + Delivers1, Acks1}; + true -> Acc end end end, {?QUEUE:new(), [], []}, List), @@ -1078,9 +1080,10 @@ in_r(MsgStatus = #msg_status { msg = undefined }, case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, true, State), - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) } + read_msg(MsgStatus, State), + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1096,19 +1099,19 @@ queue_out(State = #vqstate { q4 = Q4 }) -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. -read_msg(#msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> +read_msg(#msg_status{msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent}, + State = #vqstate{msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam), - {Msg, State #vqstate { ram_msg_count = RamMsgCount1, - msg_store_clients = MSCState1 }}; -read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) -> + {Msg, State #vqstate {msg_store_clients = MSCState1}}; +read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. +inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> + State#vqstate{ram_msg_count = RamMsgCount + 1}. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1122,7 +1125,7 @@ remove(AckRequired, MsgStatus = #msg_status { index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount }) -> + persistent_count = PCount}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1151,11 +1154,11 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1 }}. + {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len - 1, + persistent_count = PCount1}}. purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1390,10 +1393,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, true, State), - {MsgStatus#msg_status { msg = Msg }, State1}; -publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> - {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. + {Msg, State1} = read_msg(MsgStatus, State), + {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; +publish_alpha(MsgStatus, State) -> + {MsgStatus, inc_ram_msg_count(State)}. publish_beta(MsgStatus, State) -> {#msg_status { msg = Msg} = MsgStatus1, |