summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-02-15 14:25:18 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-02-15 14:25:18 +0000
commitee9f6d3f80103ba9d6a9692aebfe3202ce845407 (patch)
tree97d634dc5c8f9d8f63502e2e33d349d392d121bf
parent8cef15fec09a57d7da86435cc4c6a9c68e506d1d (diff)
parenta7a37e1e930867d86b23568c3d4ff7ea3464a0b0 (diff)
downloadrabbitmq-server-ee9f6d3f80103ba9d6a9692aebfe3202ce845407.tar.gz
Merge bug24086
-rw-r--r--Makefile2
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl27
-rw-r--r--src/rabbit_misc.erl15
-rw-r--r--src/rabbit_node_monitor.erl23
-rw-r--r--src/rabbit_writer.erl3
6 files changed, 55 insertions, 36 deletions
diff --git a/Makefile b/Makefile
index 31c71dd4..5d019e7e 100644
--- a/Makefile
+++ b/Makefile
@@ -347,7 +347,7 @@ $(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML))))
# Note that all targets which depend on clean must have clean in their
# name. Also any target that doesn't depend on clean should not have
# clean in its name, unless you know that you don't need any of the
-# automatic dependency generation for that target (eg cleandb).
+# automatic dependency generation for that target (e.g. cleandb).
# We want to load the dep file if *any* target *doesn't* contain
# "clean" - i.e. if removing all clean-like targets leaves something
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fb4540a3..a7dfd535 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -25,7 +25,7 @@
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -137,6 +139,7 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
@@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C -> C - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5629b836..12cd0c93 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -710,15 +710,12 @@ infos(Items, State) ->
|| Item <- (Items1 -- [synchronised_slave_pids])].
slaves_status(#q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
- rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} ->
[{slave_pids, ''}, {synchronised_slave_pids, ''}];
- _ ->
+ {ok, #amqqueue{slave_pids = SPids}} ->
{Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
{SPids1, SSPids} =
lists:foldl(
fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
@@ -762,11 +759,9 @@ i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes,
- slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined -> [];
- _ -> SPids
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
@@ -823,7 +818,7 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
+ {notify_sent, _ChPid, _Credit} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
@@ -1057,11 +1052,11 @@ handle_cast({unblock, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C) -> C#cr{is_limit_active = false} end));
-handle_cast({notify_sent, ChPid}, State) ->
+handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - 1}
+ C#cr{unsent_message_count = Count - Credit}
end));
handle_cast({limit, ChPid, Limiter}, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9a6879b1..b6d38172 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -720,13 +720,14 @@ gb_trees_foreach(Fun, Tree) ->
%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
- {AsOut, Value} = case Def of
- {flag, Key} ->
- get_flag(Key, AsIn);
- {option, Key, Default} ->
- get_option(Key, Default, AsIn)
- end,
- {AsOut, [{Key, Value} | RsIn]}
+ {K, {AsOut, V}} =
+ case Def of
+ {flag, Key} ->
+ {Key, get_flag(Key, AsIn)};
+ {option, Key, Default} ->
+ {Key, get_option(Key, Default, AsIn)}
+ end,
+ {AsOut, [{K, V} | RsIn]}
end, {As, []}, Defs).
get_option(K, _Default, [K, V | As]) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 4fc91860..323cf0ce 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -55,29 +55,32 @@ notify_cluster() ->
{_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
end,
%% register other active rabbits with this rabbit
- [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ [ rabbit_running_on(N) || N <- Nodes ],
ok.
%%--------------------------------------------------------------------
init([]) ->
- {ok, no_state}.
+ {ok, ordsets:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, State) ->
- rabbit_log:info("rabbit on ~p up~n", [Node]),
- erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node),
- {noreply, State};
+handle_cast({rabbit_running_on, Node}, Nodes) ->
+ case ordsets:is_element(Node, Nodes) of
+ true -> {noreply, Nodes};
+ false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ ok = handle_live_rabbit(Node),
+ {noreply, ordsets:add_element(Node, Nodes)}
+ end;
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
- rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+ rabbit_log:info("rabbit on node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, ordsets:del_element(Node, Nodes)};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 269128df..dc74b2f5 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->