diff options
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 94 |
1 files changed, 51 insertions, 43 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ca495733..4e9d5aef 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, - maybe_auto_sync/1]). + maybe_auto_sync/1, log_info/3, log_warning/3]). %% for testing only -export([module/1]). @@ -56,6 +56,8 @@ -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). +-spec(log_warning/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). -endif. @@ -148,66 +150,72 @@ drop_mirrors(QName, Nodes) -> ok. drop_mirror(QName, MirrorNode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - {error, {queue_not_mirrored_on_node, MirrorNode}}; - [QPid] when SPids =:= [] -> - {error, cannot_drop_only_mirror}; - [Pid] -> - rabbit_log:info( - "Dropping queue mirror on node ~p for ~s~n", - [MirrorNode, rabbit_misc:rs(Name)]), - exit(Pid, {shutdown, dropped}), - {ok, dropped} - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + {error, {queue_not_mirrored_on_node, MirrorNode}}; + [QPid] when SPids =:= [] -> + {error, cannot_drop_only_mirror}; + [Pid] -> + log_info(Name, "Dropping queue mirror on node ~p~n", + [MirrorNode]), + exit(Pid, {shutdown, dropped}), + {ok, dropped} + end; + {error, not_found} = E -> + E + end. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> - rabbit_amqqueue:with( - QName, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end - end). + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q, SyncMode); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q, SyncMode) + end + end; + {error, not_found} = E -> + E + end. start_child(Name, MirrorNode, Q, SyncMode) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(down), - fun () -> - rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) - end) of - {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode); - _ -> ok - end. + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + log_info(Name, "Adding mirror on node ~p: ~p~n", + [MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode) + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> - rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - case IsMaster of + log_info(QueueName, "~s ~s saw deaths of mirrors ~s~n", + [case IsMaster of true -> "Master"; false -> "Slave" end, rabbit_misc:pid_to_string(MirrorPid), [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). +log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args). +log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args). + +log(Level, QName, Fmt, Args) -> + rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, + [rabbit_misc:rs(QName) | Args]). + store_updated_slaves(Q = #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, |