summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Fox <tim@rabbitmq.com>2011-02-10 17:49:33 +0000
committerTim Fox <tim@rabbitmq.com>2011-02-10 17:49:33 +0000
commit06d42a9d8b565a900f0d9af8b166d76baf806f19 (patch)
tree6800e10ae870966bf2cc08ced85bffffd0838cc2
parentd23a7dd42b8ee932dffdc9f9cb0c286bb3cb4982 (diff)
parentcd012caed4385b393ad612aabc12107419e13e15 (diff)
downloadrabbitmq-server-06d42a9d8b565a900f0d9af8b166d76baf806f19.tar.gz
merge default into bug23575
-rw-r--r--ebin/rabbit_app.in9
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--scripts/rabbitmq-service.bat1
-rw-r--r--src/delegate.erl18
-rw-r--r--src/delegate_sup.erl26
-rw-r--r--src/rabbit.erl12
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_networking.erl16
-rw-r--r--src/rabbit_tests.erl34
12 files changed, 103 insertions, 58 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index cc7221d6..f837684c 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -34,4 +34,11 @@
{collect_statistics, none},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
- {delegate_count, 16}]}]}.
+ {delegate_count, 16},
+ {tcp_listen_options, [binary,
+ {packet, raw},
+ {reuseaddr, true},
+ {backlog, 128},
+ {nodelay, true},
+ {exit_on_close, false}]}
+ ]}]}.
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 5c390a51..2f80eb96 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -16,7 +16,6 @@
##
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
--kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 0cfa5ea8..2ca9f2b3 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+W w ^
+A30 ^
+P 1048576 ^
--kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 43520b55..bc452fea 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/src/delegate.erl b/src/delegate.erl
index 46bd8245..17046201 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
+-export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-
-endif.
%%----------------------------------------------------------------------------
@@ -111,22 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count([RemoteNode | _]) ->
- {ok, Count} = case application:get_env(rabbit, delegate_count) of
- undefined -> rpc:call(RemoteNode, application, get_env,
- [rabbit, delegate_count]);
- Result -> Result
- end,
- Count.
-
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name =
- delegate_name(erlang:phash2(
- self(), delegate_count(RemoteNodes))),
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(),
+ delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index e0ffa7c8..fc693c7d 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/1, count/1]).
-export([init/1]).
@@ -28,20 +28,32 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(count/1 :: ([node()]) -> integer()).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(Count) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
+
+count([]) ->
+ 1;
+count([Node | Nodes]) ->
+ try
+ length(supervisor:which_children({?SERVER, Node}))
+ catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
+ count(Nodes);
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
+ R =:= nodedown ->
+ count(Nodes)
+ end.
%%----------------------------------------------------------------------------
-init(_Args) ->
- DCount = delegate:delegate_count([node()]),
+init([Count]) ->
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, DCount - 1)]}}.
+ Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 67e2e40f..1beed5c1 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -101,8 +101,7 @@
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
- {mfa, {rabbit_sup, start_child,
- [delegate_sup]}},
+ {mfa, {rabbit, boot_delegate, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -184,6 +183,9 @@
{running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
+-spec(maybe_insert_default_data/0 :: () -> 'ok').
+-spec(boot_delegate/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -453,6 +455,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end
end.
+boot_delegate() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2545b07c..6e5aae27 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
- case is_process_alive(QPid) of
+ case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
fun (Tx) -> TailFun(Tx), ExistingQ end
@@ -422,7 +422,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c7e28fe..496b2064 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -790,20 +790,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e36b1dd1..abc27c5f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif.
@@ -861,3 +863,12 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 283d25c7..36f61628 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,16 +32,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- {backlog, 128}, % use the maximum listen(2) backlog value
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}
- ]).
-
-define(SSL_TIMEOUT, 5). %% seconds
-define(FIRST_TEST_BIND_PORT, 10000).
@@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS],
+ [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
@@ -315,6 +305,10 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+tcp_opts() ->
+ {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
+ Opts.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ddb53b15..58c369b5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -82,6 +82,7 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
@@ -90,13 +91,14 @@ run_cluster_dependent_tests(SecondaryNode) ->
Remote = spawn(SecondaryNode,
fun () -> Rs = [ test_delegates_async(Node),
test_delegates_sync(Node),
- test_queue_cleanup(Node) ],
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
Self ! {self(), Rs}
end),
receive
{Remote, Result} ->
- Result = [passed, passed, passed]
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
throw(timeout)
end,
@@ -1310,6 +1312,32 @@ test_queue_cleanup(_SecondaryNode) ->
end,
passed.
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
+
%---------------------------------------------------------------------
control_action(Command, Args) ->