diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-12-09 21:07:58 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-12-09 21:07:58 +0000 |
commit | ceab56de2e586d636978d0e84e74484079bfc4c2 (patch) | |
tree | 480eaa01571f93ebad8d77653a39a5a9f342fd6d | |
parent | 2a4cd27723a8eae3d9c41db2bfebf09ecff40956 (diff) | |
download | rabbitmq-server-ceab56de2e586d636978d0e84e74484079bfc4c2.tar.gz |
cosmetic: put methods in more sensible order
-rw-r--r-- | src/rabbit_channel.erl | 113 |
1 files changed, 57 insertions, 56 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8df8a974..82ba510c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,10 +35,10 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1, flush_confirms/1, confirm/2]). +-export([emit_stats/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -90,12 +90,15 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). +-spec(flush_confirms/1 :: (pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -103,8 +106,6 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). --spec(flush_confirms/1 :: (pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). -endif. @@ -121,6 +122,9 @@ do(Pid, Method) -> do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). +flush(Pid) -> + gen_server2:call(Pid, flush). + shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -133,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). + +flush_confirms(Pid) -> + gen_server2:cast(Pid, flush_confirms). + list() -> pg_local:get_members(rabbit_channels). @@ -156,15 +166,6 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). -flush(Pid) -> - gen_server2:call(Pid, flush). - -flush_confirms(Pid) -> - gen_server2:cast(Pid, flush_confirms). - -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -215,6 +216,9 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -224,9 +228,6 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(flush, _From, State) -> - reply(ok, State); - handle_call(_Request, _From, State) -> noreply(State). @@ -1250,6 +1251,45 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +start_confirm_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, + ?MODULE, flush_confirms, [self()]), + State#ch{confirm_tref = TRef}; +start_confirm_timer(State) -> + State. + +stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +internal_flush_confirms(State = #ch{writer_pid = WriterPid, + held_confirms = Cs}) -> + case gb_sets:is_empty(Cs) of + true -> State#ch{confirm_tref = undefined}; + false -> [First | Rest] = gb_sets:to_list(Cs), + {Mult, Inds} = find_consecutive_sequence(First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, multiple = true}), + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Inds), + State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined} + end. + +%% Find longest sequence of consecutive numbers at the beginning. +find_consecutive_sequence(Last, []) -> + {Last, []}; +find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> + find_consecutive_sequence(N, Ns); +find_consecutive_sequence(Last, Ns) -> + {Last, Ns}. + terminate(State) -> stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), @@ -1337,42 +1377,3 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. - -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, - ?MODULE, flush_confirms, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> - State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. - -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. |