diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-02-15 14:25:18 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-02-15 14:25:18 +0000 |
commit | ee9f6d3f80103ba9d6a9692aebfe3202ce845407 (patch) | |
tree | 97d634dc5c8f9d8f63502e2e33d349d392d121bf | |
parent | 8cef15fec09a57d7da86435cc4c6a9c68e506d1d (diff) | |
parent | a7a37e1e930867d86b23568c3d4ff7ea3464a0b0 (diff) | |
download | rabbitmq-server-ee9f6d3f80103ba9d6a9692aebfe3202ce845407.tar.gz |
Merge bug24086
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 15 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 23 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 3 |
6 files changed, 55 insertions, 36 deletions
@@ -347,7 +347,7 @@ $(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML)))) # Note that all targets which depend on clean must have clean in their # name. Also any target that doesn't depend on clean should not have # clean in its name, unless you know that you don't need any of the -# automatic dependency generation for that target (eg cleandb). +# automatic dependency generation for that target (e.g. cleandb). # We want to load the dep file if *any* target *doesn't* contain # "clean" - i.e. if removing all clean-like targets leaves something diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fb4540a3..a7dfd535 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,7 +25,7 @@ -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, unblock/2, flush_all/2]). +-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([store_queue/1]). @@ -40,6 +40,8 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -137,6 +139,7 @@ -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: @@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + Key = {consumer_credit_to, QPid}, + put(Key, case get(Key) of + 1 -> gen_server2:cast( + QPid, {notify_sent, ChPid, + ?MORE_CONSUMER_CREDIT_AFTER}), + ?MORE_CONSUMER_CREDIT_AFTER; + undefined -> erlang:monitor(process, QPid), + ?MORE_CONSUMER_CREDIT_AFTER - 1; + C -> C - 1 + end), + ok. + +notify_sent_queue_down(QPid) -> + erase({consumer_credit_to, QPid}), + ok. unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5629b836..12cd0c93 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -710,15 +710,12 @@ infos(Items, State) -> || Item <- (Items1 -- [synchronised_slave_pids])]. slaves_status(#q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - _ -> + {ok, #amqqueue{slave_pids = SPids}} -> {Results, _Bad} = - delegate:invoke( - SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), {SPids1, SSPids} = lists:foldl( fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> @@ -762,11 +759,9 @@ i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, - slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> []; - _ -> SPids + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{slave_pids = SPids}} -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -823,7 +818,7 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; + {notify_sent, _ChPid, _Credit} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 @@ -1057,11 +1052,11 @@ handle_cast({unblock, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C) -> C#cr{is_limit_active = false} end)); -handle_cast({notify_sent, ChPid}, State) -> +handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - 1} + C#cr{unsent_message_count = Count - Credit} end)); handle_cast({limit, ChPid, Limiter}, State) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9a6879b1..b6d38172 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -720,13 +720,14 @@ gb_trees_foreach(Fun, Tree) -> %% [{"-q",true},{"-p","/"}]} get_options(Defs, As) -> lists:foldl(fun(Def, {AsIn, RsIn}) -> - {AsOut, Value} = case Def of - {flag, Key} -> - get_flag(Key, AsIn); - {option, Key, Default} -> - get_option(Key, Default, AsIn) - end, - {AsOut, [{Key, Value} | RsIn]} + {K, {AsOut, V}} = + case Def of + {flag, Key} -> + {Key, get_flag(Key, AsIn)}; + {option, Key, Default} -> + {Key, get_option(Key, Default, AsIn)} + end, + {AsOut, [{K, V} | RsIn]} end, {As, []}, Defs). get_option(K, _Default, [K, V | As]) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 4fc91860..323cf0ce 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -55,29 +55,32 @@ notify_cluster() -> {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) end, %% register other active rabbits with this rabbit - [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + [ rabbit_running_on(N) || N <- Nodes ], ok. %%-------------------------------------------------------------------- init([]) -> - {ok, no_state}. + {ok, ordsets:new()}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, State) -> - rabbit_log:info("rabbit on ~p up~n", [Node]), - erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node), - {noreply, State}; +handle_cast({rabbit_running_on, Node}, Nodes) -> + case ordsets:is_element(Node, Nodes) of + true -> {noreply, Nodes}; + false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + ok = handle_live_rabbit(Node), + {noreply, ordsets:add_element(Node, Nodes)} + end; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> - rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> + rabbit_log:info("rabbit on node ~p down~n", [Node]), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, ordsets:del_element(Node, Nodes)}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 269128df..dc74b2f5 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; +handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> + rabbit_amqqueue:notify_sent_queue_down(QPid), + State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> |