diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-24 18:11:01 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-24 18:11:01 +0000 |
commit | 030aa84b31573804c392571d9febd3db483b361f (patch) | |
tree | c20714295d687b0be085e1263953b5043b6fc31d | |
parent | 7185a880f04f47df4d484a73af7e2e42852bc7d4 (diff) | |
download | rabbitmq-server-030aa84b31573804c392571d9febd3db483b361f.tar.gz |
Remove ?CREDIT_CPU_BOUND and have /1 versions of ack and send instead.bug24671
-rw-r--r-- | include/rabbit.hrl | 1 | ||||
-rw-r--r-- | src/credit_flow.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 |
5 files changed, 13 insertions, 6 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ac0584eb..940ad135 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -93,7 +93,6 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(CREDIT_CPU_BOUND, {200, 150}). -define(CREDIT_DISC_BOUND, {2000, 1500}). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). diff --git a/src/credit_flow.erl b/src/credit_flow.erl index cfafd920..397aa191 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -25,7 +25,9 @@ %% senders when it is itself blocked - thus the only processes that %% need to check blocked/0 are ones that read from network sockets. --export([ack/2, handle_bump_msg/1, blocked/0, send/2]). +-define(DEFAULT_CREDIT, {200, 150}). + +-export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -35,9 +37,11 @@ -opaque(bump_msg() :: {pid(), non_neg_integer()}). -opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}). +-spec(ack/1 :: (pid()) -> 'ok'). -spec(ack/2 :: (pid(), credit_spec()) -> 'ok'). -spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). -spec(blocked/0 :: () -> boolean()). +-spec(send/1 :: (pid()) -> 'ok'). -spec(send/2 :: (pid(), credit_spec()) -> 'ok'). -spec(peer_down/1 :: (pid()) -> 'ok'). @@ -56,6 +60,8 @@ %% For any given pair of processes, ack/2 and send/2 must always be %% called with the same credit_spec(). +ack(To) -> ack(To, ?DEFAULT_CREDIT). + ack(To, {MaxCredit, MoreCreditAt}) -> MoreCreditAt1 = MoreCreditAt + 1, Credit = @@ -78,6 +84,8 @@ handle_bump_msg({From, MoreCredit}) -> blocked() -> get(credit_blocked, []) =/= []. +send(From) -> send(From, ?DEFAULT_CREDIT). + send(From, {MaxCredit, _MoreCreditAt}) -> Credit = get({credit_from, From}, MaxCredit) - 1, case Credit of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 21383905..94a99a49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -535,7 +535,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> %% case below. QPids = qpids(Qs), case Flow of - flow -> [credit_flow:send(QPid, ?CREDIT_CPU_BOUND) || QPid <- QPids]; + flow -> [credit_flow:send(QPid) || QPid <- QPids]; noflow -> ok end, delegate:invoke_no_result( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d6ce5c8f..f05fc336 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1032,7 +1032,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> undefined -> put(Key, erlang:monitor(process, Sender)); _ -> ok end, - credit_flow:ack(Sender, ?CREDIT_CPU_BOUND); + credit_flow:ack(Sender); noflow -> ok end, noreply(deliver_or_enqueue(Delivery, State)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2c59bce..0f6d2517 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -116,7 +116,7 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content, noflow}). do_flow(Pid, Method, Content) -> - credit_flow:send(Pid, ?CREDIT_CPU_BOUND), + credit_flow:send(Pid), gen_server2:cast(Pid, {method, Method, Content, flow}). flush(Pid) -> @@ -252,7 +252,7 @@ handle_call(_Request, _From, State) -> handle_cast({method, Method, Content, Flow}, State = #ch{conn_pid = Conn}) -> case Flow of - flow -> credit_flow:ack(Conn, ?CREDIT_CPU_BOUND); + flow -> credit_flow:ack(Conn); noflow -> ok end, try handle_method(Method, Content, State) of |