diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-13 16:10:28 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-13 16:10:28 +0000 |
commit | 031554b8334f59abfdd18f2cb87e518027eb37b3 (patch) | |
tree | 0920a5046e85247f25ba36002569e834157a3b0c | |
parent | 94769c9830e32cca3889488e122a0f1f99e12b78 (diff) | |
download | rabbitmq-server-031554b8334f59abfdd18f2cb87e518027eb37b3.tar.gz |
rabbit_flow -> credit_flow
-rw-r--r-- | src/credit_flow.erl (renamed from src/rabbit_flow.erl) | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 8 |
5 files changed, 13 insertions, 13 deletions
diff --git a/src/rabbit_flow.erl b/src/credit_flow.erl index 5e327ff8..60644110 100644 --- a/src/rabbit_flow.erl +++ b/src/credit_flow.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --module(rabbit_flow). +-module(credit_flow). %% Credit starts at ?MAX_CREDIT and goes down. Both sides keep %% track. When the receiver goes below ?MORE_CREDIT_AT it issues more diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fee63378..0a426733 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -644,7 +644,7 @@ handle_ch_publisher_down(DownPid) -> case lookup_ch_publisher(DownPid) of not_found -> ok; _ -> erase_ch_record_publisher(DownPid), - rabbit_flow:sender_down(DownPid) + credit_flow:sender_down(DownPid) end. check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> @@ -1051,7 +1051,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. ch_record_publisher(Sender), - rabbit_flow:ack(Sender), + credit_flow:ack(Sender), noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index db9ff7a0..66515a11 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -247,7 +247,7 @@ handle_call(_Request, _From, State) -> handle_cast({method, Method, Content}, State = #ch{conn_pid = Conn}) -> case Content of none -> ok; - _ -> rabbit_flow:ack(Conn) + _ -> credit_flow:ack(Conn) end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> @@ -320,7 +320,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). handle_info({bump_credit, Msg}, State) -> - rabbit_flow:handle_bump_msg(Msg), + credit_flow:handle_bump_msg(Msg), noreply(State); handle_info(timeout, State) -> @@ -335,7 +335,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), - rabbit_flow:receiver_down(QPid), + credit_flow:receiver_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = sets:del_element(QPid, State3#ch.queue_monitors)}); @@ -1352,7 +1352,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery), State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), case Mandatory orelse Immediate of - false -> [rabbit_flow:send(QPid) || QPid <- DeliveredQPids]; + false -> [credit_flow:send(QPid) || QPid <- DeliveredQPids]; _ -> ok end, State2 = process_routing_result(RoutingRes, DeliveredQPids, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9f6773b2..5a2a7e66 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -210,7 +210,7 @@ handle_cast({gm, Instruction}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - rabbit_flow:ack(Sender), + credit_flow:ack(Sender), noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> @@ -602,7 +602,7 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dict:is_key(ChPid, KS) of false -> ok; - true -> rabbit_flow:sender_down(ChPid), + true -> credit_flow:sender_down(ChPid), confirm_sender_death(ChPid) end, State. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ed48575a..e1e3eb6a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -347,7 +347,7 @@ handle_other(emit_stats, Deb, State) -> handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other({bump_credit, Msg}, Deb, State) -> - rabbit_flow:handle_bump_msg(Msg), + credit_flow:handle_bump_msg(Msg), recvloop(Deb, control_throttle(State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for @@ -365,7 +365,7 @@ terminate(_Explanation, State) -> control_throttle(State = #v1{connection_state = CS, conserve_memory = Mem}) -> - case {CS, Mem orelse rabbit_flow:blocked()} of + case {CS, Mem orelse credit_flow:blocked()} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( @@ -419,7 +419,7 @@ handle_dependent_exit(ChPid, Reason, State) -> channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; - {Channel, MRef} -> rabbit_flow:receiver_down(ChPid), + {Channel, MRef} -> credit_flow:receiver_down(ChPid), erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), @@ -927,7 +927,7 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> {ok, NewAState} -> NewAState; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> rabbit_flow:send(ChPid), + {ok, Method, Content, NewAState} -> credit_flow:send(ChPid), rabbit_channel:do(ChPid, Method, Content), NewAState; |