summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-24 18:11:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-24 18:11:01 +0000
commit030aa84b31573804c392571d9febd3db483b361f (patch)
treec20714295d687b0be085e1263953b5043b6fc31d
parent7185a880f04f47df4d484a73af7e2e42852bc7d4 (diff)
downloadrabbitmq-server-030aa84b31573804c392571d9febd3db483b361f.tar.gz
Remove ?CREDIT_CPU_BOUND and have /1 versions of ack and send instead.bug24671
-rw-r--r--include/rabbit.hrl1
-rw-r--r--src/credit_flow.erl10
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl4
5 files changed, 13 insertions, 6 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index ac0584eb..940ad135 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -93,7 +93,6 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(CREDIT_CPU_BOUND, {200, 150}).
-define(CREDIT_DISC_BOUND, {2000, 1500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index cfafd920..397aa191 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -25,7 +25,9 @@
%% senders when it is itself blocked - thus the only processes that
%% need to check blocked/0 are ones that read from network sockets.
--export([ack/2, handle_bump_msg/1, blocked/0, send/2]).
+-define(DEFAULT_CREDIT, {200, 150}).
+
+-export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -35,9 +37,11 @@
-opaque(bump_msg() :: {pid(), non_neg_integer()}).
-opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+-spec(ack/1 :: (pid()) -> 'ok').
-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
-spec(blocked/0 :: () -> boolean()).
+-spec(send/1 :: (pid()) -> 'ok').
-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(peer_down/1 :: (pid()) -> 'ok').
@@ -56,6 +60,8 @@
%% For any given pair of processes, ack/2 and send/2 must always be
%% called with the same credit_spec().
+ack(To) -> ack(To, ?DEFAULT_CREDIT).
+
ack(To, {MaxCredit, MoreCreditAt}) ->
MoreCreditAt1 = MoreCreditAt + 1,
Credit =
@@ -78,6 +84,8 @@ handle_bump_msg({From, MoreCredit}) ->
blocked() ->
get(credit_blocked, []) =/= [].
+send(From) -> send(From, ?DEFAULT_CREDIT).
+
send(From, {MaxCredit, _MoreCreditAt}) ->
Credit = get({credit_from, From}, MaxCredit) - 1,
case Credit of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 21383905..94a99a49 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -535,7 +535,7 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
%% case below.
QPids = qpids(Qs),
case Flow of
- flow -> [credit_flow:send(QPid, ?CREDIT_CPU_BOUND) || QPid <- QPids];
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
noflow -> ok
end,
delegate:invoke_no_result(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d6ce5c8f..f05fc336 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1032,7 +1032,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
undefined -> put(Key, erlang:monitor(process, Sender));
_ -> ok
end,
- credit_flow:ack(Sender, ?CREDIT_CPU_BOUND);
+ credit_flow:ack(Sender);
noflow -> ok
end,
noreply(deliver_or_enqueue(Delivery, State));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2c59bce..0f6d2517 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -116,7 +116,7 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content, noflow}).
do_flow(Pid, Method, Content) ->
- credit_flow:send(Pid, ?CREDIT_CPU_BOUND),
+ credit_flow:send(Pid),
gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
@@ -252,7 +252,7 @@ handle_call(_Request, _From, State) ->
handle_cast({method, Method, Content, Flow}, State = #ch{conn_pid = Conn}) ->
case Flow of
- flow -> credit_flow:ack(Conn, ?CREDIT_CPU_BOUND);
+ flow -> credit_flow:ack(Conn);
noflow -> ok
end,
try handle_method(Method, Content, State) of