summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-13 16:10:28 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-13 16:10:28 +0000
commit031554b8334f59abfdd18f2cb87e518027eb37b3 (patch)
tree0920a5046e85247f25ba36002569e834157a3b0c
parent94769c9830e32cca3889488e122a0f1f99e12b78 (diff)
downloadrabbitmq-server-031554b8334f59abfdd18f2cb87e518027eb37b3.tar.gz
rabbit_flow -> credit_flow
-rw-r--r--src/credit_flow.erl (renamed from src/rabbit_flow.erl)2
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_reader.erl8
5 files changed, 13 insertions, 13 deletions
diff --git a/src/rabbit_flow.erl b/src/credit_flow.erl
index 5e327ff8..60644110 100644
--- a/src/rabbit_flow.erl
+++ b/src/credit_flow.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--module(rabbit_flow).
+-module(credit_flow).
%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep
%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fee63378..0a426733 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -644,7 +644,7 @@ handle_ch_publisher_down(DownPid) ->
case lookup_ch_publisher(DownPid) of
not_found -> ok;
_ -> erase_ch_record_publisher(DownPid),
- rabbit_flow:sender_down(DownPid)
+ credit_flow:sender_down(DownPid)
end.
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
@@ -1051,7 +1051,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
ch_record_publisher(Sender),
- rabbit_flow:ack(Sender),
+ credit_flow:ack(Sender),
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index db9ff7a0..66515a11 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -247,7 +247,7 @@ handle_call(_Request, _From, State) ->
handle_cast({method, Method, Content}, State = #ch{conn_pid = Conn}) ->
case Content of
none -> ok;
- _ -> rabbit_flow:ack(Conn)
+ _ -> credit_flow:ack(Conn)
end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
@@ -320,7 +320,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
handle_info({bump_credit, Msg}, State) ->
- rabbit_flow:handle_bump_msg(Msg),
+ credit_flow:handle_bump_msg(Msg),
noreply(State);
handle_info(timeout, State) ->
@@ -335,7 +335,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
- rabbit_flow:receiver_down(QPid),
+ credit_flow:receiver_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
sets:del_element(QPid, State3#ch.queue_monitors)});
@@ -1352,7 +1352,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
{RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
case Mandatory orelse Immediate of
- false -> [rabbit_flow:send(QPid) || QPid <- DeliveredQPids];
+ false -> [credit_flow:send(QPid) || QPid <- DeliveredQPids];
_ -> ok
end,
State2 = process_routing_result(RoutingRes, DeliveredQPids,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9f6773b2..5a2a7e66 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -210,7 +210,7 @@ handle_cast({gm, Instruction}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- rabbit_flow:ack(Sender),
+ credit_flow:ack(Sender),
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -602,7 +602,7 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> rabbit_flow:sender_down(ChPid),
+ true -> credit_flow:sender_down(ChPid),
confirm_sender_death(ChPid)
end,
State.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ed48575a..e1e3eb6a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -347,7 +347,7 @@ handle_other(emit_stats, Deb, State) ->
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other({bump_credit, Msg}, Deb, State) ->
- rabbit_flow:handle_bump_msg(Msg),
+ credit_flow:handle_bump_msg(Msg),
recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
@@ -365,7 +365,7 @@ terminate(_Explanation, State) ->
control_throttle(State = #v1{connection_state = CS,
conserve_memory = Mem}) ->
- case {CS, Mem orelse rabbit_flow:blocked()} of
+ case {CS, Mem orelse credit_flow:blocked()} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
@@ -419,7 +419,7 @@ handle_dependent_exit(ChPid, Reason, State) ->
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> rabbit_flow:receiver_down(ChPid),
+ {Channel, MRef} -> credit_flow:receiver_down(ChPid),
erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
@@ -927,7 +927,7 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
{ok, NewAState} -> NewAState;
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> rabbit_flow:send(ChPid),
+ {ok, Method, Content, NewAState} -> credit_flow:send(ChPid),
rabbit_channel:do(ChPid,
Method, Content),
NewAState;