diff options
Diffstat (limited to 'src/gatherer.erl')
-rw-r--r-- | src/gatherer.erl | 145 |
1 files changed, 0 insertions, 145 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl deleted file mode 100644 index 8bce1707..00000000 --- a/src/gatherer.erl +++ /dev/null @@ -1,145 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. -%% - --module(gatherer). - --behaviour(gen_server2). - --export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(stop/1 :: (pid()) -> 'ok'). --spec(fork/1 :: (pid()) -> 'ok'). --spec(finish/1 :: (pid()) -> 'ok'). --spec(in/2 :: (pid(), any()) -> 'ok'). --spec(sync_in/2 :: (pid(), any()) -> 'ok'). --spec(out/1 :: (pid()) -> {'value', any()} | 'empty'). - --endif. - -%%---------------------------------------------------------------------------- - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -%%---------------------------------------------------------------------------- - --record(gstate, { forks, values, blocked }). - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). - -stop(Pid) -> - gen_server2:call(Pid, stop, infinity). - -fork(Pid) -> - gen_server2:call(Pid, fork, infinity). - -finish(Pid) -> - gen_server2:cast(Pid, finish). - -in(Pid, Value) -> - gen_server2:cast(Pid, {in, Value}). - -sync_in(Pid, Value) -> - gen_server2:call(Pid, {in, Value}, infinity). - -out(Pid) -> - gen_server2:call(Pid, out, infinity). - -%%---------------------------------------------------------------------------- - -init([]) -> - {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() }, - hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}; - -handle_call(fork, _From, State = #gstate { forks = Forks }) -> - {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; - -handle_call({in, Value}, From, State) -> - {noreply, in(Value, From, State), hibernate}; - -handle_call(out, From, State = #gstate { forks = Forks, - values = Values, - blocked = Blocked }) -> - case queue:out(Values) of - {empty, _} when Forks == 0 -> - {reply, empty, State, hibernate}; - {empty, _} -> - {noreply, State #gstate { blocked = queue:in(From, Blocked) }, - hibernate}; - {{value, {PendingIn, Value}}, NewValues} -> - reply(PendingIn, ok), - {reply, {value, Value}, State #gstate { values = NewValues }, - hibernate} - end; - -handle_call(Msg, _From, State) -> - {stop, {unexpected_call, Msg}, State}. - -handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> - NewForks = Forks - 1, - NewBlocked = case NewForks of - 0 -> [gen_server2:reply(From, empty) || - From <- queue:to_list(Blocked)], - queue:new(); - _ -> Blocked - end, - {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, - hibernate}; - -handle_cast({in, Value}, State) -> - {noreply, in(Value, undefined, State), hibernate}; - -handle_cast(Msg, State) -> - {stop, {unexpected_cast, Msg}, State}. - -handle_info(Msg, State) -> - {stop, {unexpected_info, Msg}, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, State) -> - State. - -%%---------------------------------------------------------------------------- - -in(Value, From, State = #gstate { values = Values, blocked = Blocked }) -> - case queue:out(Blocked) of - {empty, _} -> - State #gstate { values = queue:in({From, Value}, Values) }; - {{value, PendingOut}, NewBlocked} -> - reply(From, ok), - gen_server2:reply(PendingOut, {value, Value}), - State #gstate { blocked = NewBlocked } - end. - -reply(undefined, _Reply) -> ok; -reply(From, Reply) -> gen_server2:reply(From, Reply). |