summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-10-26 08:52:47 +0000
committerMatthias Radestock <matthias@lshift.net>2009-10-26 08:52:47 +0000
commit3ed2a437a10a7ecd4b1e476180d8872bc0783e52 (patch)
tree94a153ad57fad847f99598faec257424a5283c05
parent209e03ae10094cad082a7184726506615ec7bffe (diff)
parentbbb953bee4b8da350d2ab5581a770a7b0526ac89 (diff)
downloadrabbitmq-server-3ed2a437a10a7ecd4b1e476180d8872bc0783e52.tar.gz
merge bug21777 into default
-rw-r--r--docs/rabbitmqctl.1.pod2
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_control.erl19
-rw-r--r--src/rabbit_misc.erl26
-rw-r--r--src/rabbit_multi.erl26
-rw-r--r--src/rabbit_tests.erl2
6 files changed, 55 insertions, 43 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index c43ed2ea..6b420872 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -279,7 +279,7 @@ exchange arguments
=item list_bindings [-p I<vhostpath>]
List bindings by virtual host. Each line printed describes a binding,
-with the exchange name, routing key, queue name and arguments,
+with the exchange name, queue name, routing key and arguments,
separated by tab characters.
=item list_connections [I<connectioninfoitem> ...]
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2e8509..c78edcfa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -321,8 +321,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
blocked_consumers = remove_consumers(
ChPid, State#q.blocked_consumers)}),
case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ false -> {ok, NewState};
+ true -> {stop, NewState}
end
end.
@@ -576,10 +576,16 @@ handle_call({commit, Txn}, From, State) ->
erase_tx(Txn),
noreply(NewState);
-handle_call({notify_down, ChPid}, From, State) ->
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
- handle_ch_down(ChPid, State);
+handle_call({notify_down, ChPid}, _From, State) ->
+ %% we want to do this synchronously, so that auto_deleted queues
+ %% are no longer visible by the time we send a response to the
+ %% client. The queue is ultimately deleted in terminate/2; if we
+ %% return stop with a reply, terminate/2 will be called by
+ %% gen_server2 *before* the reply is sent.
+ case handle_ch_down(ChPid, State) of
+ {ok, NewState} -> reply(ok, NewState);
+ {stop, NewState} -> {stop, normal, ok, NewState}
+ end;
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
@@ -813,7 +819,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
NewState = State#q{owner = none},
{stop, normal, NewState};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
- handle_ch_down(DownPid, State);
+ case handle_ch_down(DownPid, State) of
+ {ok, NewState} -> noreply(NewState);
+ {stop, NewState} -> {stop, normal, NewState}
+ end;
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index a53ac289..19579729 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -52,12 +52,11 @@
%%----------------------------------------------------------------------------
start() ->
- {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename),
- NodeName = list_to_atom(NodeNameStr),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:localnode(NodeName)}),
+ node = rabbit_misc:makenode(NodeStr)}),
Inform = case Quiet of
true -> fun(_Format, _Args1) -> ok end;
false -> fun(Format, Args1) ->
@@ -97,12 +96,12 @@ error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
fmt_stderr("diagnostics:", []),
- NodeHost = rabbit_misc:nodehost(Node),
+ {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
case net_adm:names(NodeHost) of
{error, EpmdReason} ->
fmt_stderr("- unable to connect to epmd on ~s: ~w",
[NodeHost, EpmdReason]);
- {ok, NamePorts} ->
+ {ok, NamePorts} ->
fmt_stderr("- nodes and their ports on ~s: ~p",
[NodeHost, [{list_to_atom(Name), Port} ||
{Name, Port} <- NamePorts]])
@@ -116,11 +115,7 @@ print_badrpc_diagnostics(Node) ->
ok.
parse_args(["-n", NodeS | Args], Params) ->
- Node = case lists:member($@, NodeS) of
- true -> list_to_atom(NodeS);
- false -> rabbit_misc:localnode(list_to_atom(NodeS))
- end,
- parse_args(Args, Params#params{node = Node});
+ parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
parse_args(["-q" | Args], Params) ->
parse_args(Args, Params#params{quiet = true});
parse_args([Command | Args], Params) ->
@@ -186,7 +181,7 @@ messages, acks_uncommitted, consumers, transactions, memory]. The default is
auto_delete, arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
-exchange name, routing key, queue name and arguments, in that order.
+exchange name, queue name, routing key and arguments, in that order.
<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
@@ -290,7 +285,7 @@ action(list_exchanges, Node, Args, Inform) ->
action(list_bindings, Node, Args, Inform) ->
Inform("Listing bindings", []),
{VHostArg, _} = parse_vhost_flag_bin(Args),
- InfoKeys = [exchange_name, routing_key, queue_name, args],
+ InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b20e9a86..d84c570b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -47,7 +47,7 @@
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
--export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]).
+-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -105,8 +105,8 @@
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(localnode/1 :: (atom()) -> erlang_node()).
--spec(nodehost/1 :: (erlang_node()) -> string()).
+-spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()).
+-spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
@@ -308,13 +308,19 @@ execute_mnesia_transaction(TxFun) ->
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
-localnode(Name) ->
- list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])).
-
-nodehost(Node) ->
- %% This is horrible, but there doesn't seem to be a way to split a
- %% nodename into its constituent parts.
- tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))).
+makenode({Prefix, Suffix}) ->
+ list_to_atom(lists:append([Prefix, "@", Suffix]));
+makenode(NodeStr) ->
+ makenode(nodeparts(NodeStr)).
+
+nodeparts(Node) when is_atom(Node) ->
+ nodeparts(atom_to_list(Node));
+nodeparts(NodeStr) ->
+ case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+ {Prefix, []} -> {_, Suffix} = nodeparts(node()),
+ {Prefix, Suffix};
+ {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+ end.
cookie_hash() ->
ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))).
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index b1cc4d02..f364872e 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -100,10 +100,12 @@ Available commands:
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
N = list_to_integer(NodeCount),
- {NodePids, Running} = start_nodes(N, N, [], true,
- getenv("RABBITMQ_NODENAME"),
- getenv("RABBITMQ_NODE_PORT"),
- RpcTimeout),
+ {NodePids, Running} =
+ start_nodes(N, N, [], true,
+ rabbit_misc:nodeparts(
+ getenv("RABBITMQ_NODENAME")),
+ list_to_integer(getenv("RABBITMQ_NODE_PORT")),
+ RpcTimeout),
write_pids_file(NodePids),
case Running of
true -> ok;
@@ -158,24 +160,24 @@ start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
start_nodes(N, Total, PNodePid, Running,
NodeNameBase, NodePortBase, RpcTimeout) ->
+ {NodePre, NodeSuff} = NodeNameBase,
NodeNumber = Total - N,
- NodeName = if NodeNumber == 0 ->
+ NodePre1 = if NodeNumber == 0 ->
%% For compatibility with running a single node
- NodeNameBase;
+ NodePre;
true ->
- NodeNameBase ++ "_" ++ integer_to_list(NodeNumber)
+ NodePre ++ "_" ++ integer_to_list(NodeNumber)
end,
- {NodePid, Started} = start_node(NodeName,
- list_to_integer(NodePortBase) + NodeNumber,
+ {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}),
+ NodePortBase + NodeNumber,
RpcTimeout),
start_nodes(N - 1, Total, [NodePid | PNodePid],
Started and Running,
NodeNameBase, NodePortBase, RpcTimeout).
-start_node(NodeName, NodePort, RpcTimeout) ->
- os:putenv("RABBITMQ_NODENAME", NodeName),
+start_node(Node, NodePort, RpcTimeout) ->
+ os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
- Node = rabbit_misc:localnode(list_to_atom(NodeName)),
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5c5c55f1..c5a7d05e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -495,7 +495,7 @@ test_cluster_management() ->
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
- SecondaryNode = rabbit_misc:localnode(hare),
+ SecondaryNode = rabbit_misc:makenode("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
pang -> io:format("Skipping clustering tests with node ~p~n",