summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl87
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl18
7 files changed, 99 insertions, 38 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index eb3c7ef3..2b70587a 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1253,10 +1253,14 @@
message loss.</para></listitem>
</varlistentry>
<varlistentry>
- <term>status</term>
- <listitem><para>The status of the queue. Normally
- 'running', but may be "{syncing, MsgCount}" if the queue is
- synchronising.</para></listitem>
+ <term>state</term>
+ <listitem><para>The state of the queue. Normally
+ 'running', but may be "{syncing, MsgCount}" if the
+ queue is synchronising. Queues which are located on
+ cluster nodes that are currently down will be shown
+ with a status of 'down' (and most other
+ <command>queueinfoitem</command>s will be
+ unavailable).</para></listitem>
</varlistentry>
</variablelist>
<para>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5e41ea93..74e165cd 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -55,7 +55,8 @@
down_slave_nodes, %% durable
policy, %% durable, implicit update as above
gm_pids, %% transient
- decorators}). %% transient, recalculated as above
+ decorators, %% transient, recalculated as above
+ state}). %% durable (have we crashed?)
-record(exchange_serial, {name, next}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 63a9529a..66f04381 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -25,6 +25,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
@@ -108,6 +109,7 @@
(name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(list/0 :: () -> [rabbit_types:amqqueue()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
+-spec(list_down/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -272,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
slave_pids = [],
sync_slave_pids = [],
down_slave_nodes = [],
- gm_pids = []})),
+ gm_pids = [],
+ state = live})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(
rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
@@ -280,7 +283,10 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
+ fun () ->
+ ok = store_queue(Q#amqqueue{state = live}),
+ rabbit_misc:const(Q)
+ end);
internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
@@ -288,7 +294,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] ->
case not_found_or_absent(QueueName) of
not_found -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
+ Q2 = Q1#amqqueue{state = live},
+ ok = store_queue(Q2),
B = add_default_binding(Q1),
fun () -> B(), Q1 end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
@@ -381,6 +388,8 @@ not_found_or_absent_dirty(Name) ->
with(Name, F, E) ->
case lookup(Name) of
+ {ok, Q = #amqqueue{state = crashed}} ->
+ E({absent, Q, crashed});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -390,11 +399,8 @@ with(Name, F, E) ->
%% the retry loop.
rabbit_misc:with_exit_handler(
fun () -> false = rabbit_misc:is_process_alive(QPid),
- case crashed_or_recovering(Q) of
- crashed -> E({absent, Q, crashed});
- recovering -> timer:sleep(25),
- with(Name, F, E)
- end
+ timer:sleep(25),
+ with(Name, F, E)
end, fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
@@ -407,20 +413,6 @@ with_or_die(Name, F) ->
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end).
-%% TODO we could say we are crashed when we mean recovering if we
-%% happen to call in the middle of a crash-failover. We could try to
-%% figure out whether that's happening by looking for the supervisor -
-%% but we'd need some additional book keeping to know what it is. And
-%% it will just mean a temporary glitch while crashing, which is
-%% fairly tolerable.
-crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) ->
- case lists:member(node(QPid), [node() | nodes()]) of
- true -> crashed;
- false -> recovering
- end;
-crashed_or_recovering(_Q) ->
- recovering.
-
assert_equivalence(#amqqueue{name = QName,
durable = Durable,
auto_delete = AD} = Q,
@@ -520,32 +512,68 @@ check_dlxrk_arg({Type, _}, _Args) ->
list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+list(VHostPath) -> list(VHostPath, rabbit_queue).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
-list(VHostPath) ->
+list(VHostPath, TableName) ->
mnesia:async_dirty(
fun () ->
mnesia:match_object(
- rabbit_queue,
+ TableName,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'},
read)
end).
+list_down(VHostPath) ->
+ Present = list(VHostPath),
+ Durable = list(VHostPath, rabbit_durable_queue),
+ PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]),
+ sets:to_list(sets:filter(fun (#amqqueue{name = N}) ->
+ not sets:is_element(N, PresentS)
+ end, sets:from_list(Durable))).
+
info_keys() -> rabbit_amqqueue_process:info_keys().
-map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
+map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
+info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
+info(Q = #amqqueue{ state = crashed }, Items) ->
+ info_down(Q, Items, crashed);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
-info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
+info_down(Q, DownReason) ->
+ info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason).
+
+info_down(Q, Items, DownReason) ->
+ [{Item, i_down(Item, Q, DownReason)} || Item <- Items].
+
+i_down(name, #amqqueue{name = Name}, _) -> Name;
+i_down(durable, #amqqueue{durable = Durable},_) -> Durable;
+i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
+i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
+i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
+i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN;
+i_down(state, _Q, DownReason) -> DownReason;
+i_down(K, _Q, _DownReason) ->
+ case lists:member(K, rabbit_amqqueue_process:info_keys()) of
+ true -> '';
+ false -> throw({bad_argument, K})
+ end.
+
+info_all(VHostPath) ->
+ map(list(VHostPath), fun (Q) -> info(Q) end) ++
+ map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end).
-info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+info_all(VHostPath, Items) ->
+ map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
+ map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
force_event_refresh(Ref) ->
[gen_server2:cast(Q#amqqueue.pid,
@@ -562,7 +590,7 @@ consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
consumers_all(VHostPath) ->
ConsumerInfoKeys=consumer_info_keys(),
lists:append(
- map(VHostPath,
+ map(list(VHostPath),
fun (Q) ->
[lists:zip(
ConsumerInfoKeys,
@@ -777,7 +805,8 @@ immutable(Q) -> Q#amqqueue{pid = none,
down_slave_nodes = none,
gm_pids = none,
policy = none,
- decorators = none}.
+ decorators = none,
+ state = none}.
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 12a3c9f0..fcf17381 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -235,8 +235,15 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
%% If we crashed don't try to clean up the BQS, probably best to leave it.
-terminate(_Reason, State) ->
- terminate_shutdown(fun (BQS) -> BQS end, State).
+terminate(_Reason, State = #q{q = Q}) ->
+ terminate_shutdown(fun (BQS) ->
+ Q2 = Q#amqqueue{state = crashed},
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQS
+ end, State).
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 1bea8042..5ce22271 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -110,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
[Q1 = #amqqueue{gm_pids = GMPids}]
= mnesia:read({rabbit_queue, QName}),
ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
%% We need synchronous add here (i.e. do not return until the
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index aec6f93d..826b6927 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -243,7 +243,8 @@ store_updated_slaves(Q = #amqqueue{pid = MPid,
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
- down_slave_nodes = DSNs1},
+ down_slave_nodes = DSNs1,
+ state = live},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1104f373..9f6dc21a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -49,6 +49,7 @@
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
+-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
%% -------------------------------------------------------------------
@@ -80,6 +81,7 @@
-spec(internal_system_x/0 :: () -> 'ok').
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
+-spec(queue_state/0 :: () -> 'ok').
-endif.
@@ -400,6 +402,22 @@ down_slave_nodes(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+queue_state() ->
+ ok = queue_state(rabbit_queue),
+ ok = queue_state(rabbit_durable_queue).
+
+queue_state(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ live}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->