diff options
-rw-r--r-- | docs/rabbitmqctl.1.xml | 12 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 87 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 3 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 18 |
7 files changed, 99 insertions, 38 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index eb3c7ef3..2b70587a 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1253,10 +1253,14 @@ message loss.</para></listitem> </varlistentry> <varlistentry> - <term>status</term> - <listitem><para>The status of the queue. Normally - 'running', but may be "{syncing, MsgCount}" if the queue is - synchronising.</para></listitem> + <term>state</term> + <listitem><para>The state of the queue. Normally + 'running', but may be "{syncing, MsgCount}" if the + queue is synchronising. Queues which are located on + cluster nodes that are currently down will be shown + with a status of 'down' (and most other + <command>queueinfoitem</command>s will be + unavailable).</para></listitem> </varlistentry> </variablelist> <para> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5e41ea93..74e165cd 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -55,7 +55,8 @@ down_slave_nodes, %% durable policy, %% durable, implicit update as above gm_pids, %% transient - decorators}). %% transient, recalculated as above + decorators, %% transient, recalculated as above + state}). %% durable (have we crashed?) -record(exchange_serial, {name, next}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 63a9529a..66f04381 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,6 +25,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). @@ -108,6 +109,7 @@ (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(list/0 :: () -> [rabbit_types:amqqueue()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). +-spec(list_down/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()). -spec(info/2 :: @@ -272,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> slave_pids = [], sync_slave_pids = [], down_slave_nodes = [], - gm_pids = []})), + gm_pids = [], + state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), @@ -280,7 +283,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); + fun () -> + ok = store_queue(Q#amqqueue{state = live}), + rabbit_misc:const(Q) + end); internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> @@ -288,7 +294,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [] -> case not_found_or_absent(QueueName) of not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), + Q2 = Q1#amqqueue{state = live}, + ok = store_queue(Q2), B = add_default_binding(Q1), fun () -> B(), Q1 end; {absent, _Q, _} = R -> rabbit_misc:const(R) @@ -381,6 +388,8 @@ not_found_or_absent_dirty(Name) -> with(Name, F, E) -> case lookup(Name) of + {ok, Q = #amqqueue{state = crashed}} -> + E({absent, Q, crashed}); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -390,11 +399,8 @@ with(Name, F, E) -> %% the retry loop. rabbit_misc:with_exit_handler( fun () -> false = rabbit_misc:is_process_alive(QPid), - case crashed_or_recovering(Q) of - crashed -> E({absent, Q, crashed}); - recovering -> timer:sleep(25), - with(Name, F, E) - end + timer:sleep(25), + with(Name, F, E) end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -407,20 +413,6 @@ with_or_die(Name, F) -> ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). -%% TODO we could say we are crashed when we mean recovering if we -%% happen to call in the middle of a crash-failover. We could try to -%% figure out whether that's happening by looking for the supervisor - -%% but we'd need some additional book keeping to know what it is. And -%% it will just mean a temporary glitch while crashing, which is -%% fairly tolerable. -crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> - case lists:member(node(QPid), [node() | nodes()]) of - true -> crashed; - false -> recovering - end; -crashed_or_recovering(_Q) -> - recovering. - assert_equivalence(#amqqueue{name = QName, durable = Durable, auto_delete = AD} = Q, @@ -520,32 +512,68 @@ check_dlxrk_arg({Type, _}, _Args) -> list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). +list(VHostPath) -> list(VHostPath, rabbit_queue). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context -list(VHostPath) -> +list(VHostPath, TableName) -> mnesia:async_dirty( fun () -> mnesia:match_object( - rabbit_queue, + TableName, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}, read) end). +list_down(VHostPath) -> + Present = list(VHostPath), + Durable = list(VHostPath, rabbit_durable_queue), + PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), + sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> + not sets:is_element(N, PresentS) + end, sets:from_list(Durable))). + info_keys() -> rabbit_amqqueue_process:info_keys(). -map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). +map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). +info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). +info(Q = #amqqueue{ state = crashed }, Items) -> + info_down(Q, Items, crashed); info(#amqqueue{ pid = QPid }, Items) -> case delegate:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. -info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). +info_down(Q, DownReason) -> + info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason). + +info_down(Q, Items, DownReason) -> + [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. + +i_down(name, #amqqueue{name = Name}, _) -> Name; +i_down(durable, #amqqueue{durable = Durable},_) -> Durable; +i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; +i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; +i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; +i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN; +i_down(state, _Q, DownReason) -> DownReason; +i_down(K, _Q, _DownReason) -> + case lists:member(K, rabbit_amqqueue_process:info_keys()) of + true -> ''; + false -> throw({bad_argument, K}) + end. + +info_all(VHostPath) -> + map(list(VHostPath), fun (Q) -> info(Q) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end). -info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +info_all(VHostPath, Items) -> + map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). force_event_refresh(Ref) -> [gen_server2:cast(Q#amqqueue.pid, @@ -562,7 +590,7 @@ consumer_info_keys() -> ?CONSUMER_INFO_KEYS. consumers_all(VHostPath) -> ConsumerInfoKeys=consumer_info_keys(), lists:append( - map(VHostPath, + map(list(VHostPath), fun (Q) -> [lists:zip( ConsumerInfoKeys, @@ -777,7 +805,8 @@ immutable(Q) -> Q#amqqueue{pid = none, down_slave_nodes = none, gm_pids = none, policy = none, - decorators = none}. + decorators = none, + state = none}. deliver([], _Delivery, _Flow) -> %% /dev/null optimisation diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 12a3c9f0..fcf17381 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -235,8 +235,15 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); %% If we crashed don't try to clean up the BQS, probably best to leave it. -terminate(_Reason, State) -> - terminate_shutdown(fun (BQS) -> BQS end, State). +terminate(_Reason, State = #q{q = Q}) -> + terminate_shutdown(fun (BQS) -> + Q2 = Q#amqqueue{state = crashed}, + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_amqqueue:store_queue(Q2) + end), + BQS + end, State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 1bea8042..5ce22271 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -110,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> [Q1 = #amqqueue{gm_pids = GMPids}] = mnesia:read({rabbit_queue, QName}), ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], + state = live}) end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), %% We need synchronous add here (i.e. do not return until the diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index aec6f93d..826b6927 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -243,7 +243,8 @@ store_updated_slaves(Q = #amqqueue{pid = MPid, SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], Q1 = Q#amqqueue{sync_slave_pids = SSPids1, - down_slave_nodes = DSNs1}, + down_slave_nodes = DSNs1, + state = live}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1104f373..9f6dc21a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -49,6 +49,7 @@ -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). +-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). %% ------------------------------------------------------------------- @@ -80,6 +81,7 @@ -spec(internal_system_x/0 :: () -> 'ok'). -spec(cluster_name/0 :: () -> 'ok'). -spec(down_slave_nodes/0 :: () -> 'ok'). +-spec(queue_state/0 :: () -> 'ok'). -endif. @@ -400,6 +402,22 @@ down_slave_nodes(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). +queue_state() -> + ok = queue_state(rabbit_queue), + ok = queue_state(rabbit_durable_queue). + +queue_state(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + live} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |