diff options
author | Robert Newson <rnewson@apache.org> | 2014-07-17 17:53:45 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2014-07-17 18:01:53 +0100 |
commit | 6293a8b88d1cd67c254ac764a046a1c3a0905be4 (patch) | |
tree | a71b9e79622d55c4ab5031557001536f4a10201b | |
download | couchdb-6293a8b88d1cd67c254ac764a046a1c3a0905be4.tar.gz |
Initial commit
This is substantively the work from branch 1775-feature-io-regulator
but with erlang application paraphenalia.
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | src/ioq.app.src | 21 | ||||
-rw-r--r-- | src/ioq.erl | 126 | ||||
-rw-r--r-- | src/ioq_app.erl | 21 | ||||
-rw-r--r-- | src/ioq_sup.erl | 24 |
5 files changed, 194 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..21cf3d388 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.rebar +ebin/ diff --git a/src/ioq.app.src b/src/ioq.app.src new file mode 100644 index 000000000..65ea50d6d --- /dev/null +++ b/src/ioq.app.src @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application,ioq, [ + {description, "I/O prioritizing engine"}, + {vsn, git}, + {registered,[]}, + {applications,[kernel,stdlib,config]}, + {mod,{ioq_app,[]}}, + {env, []}, + {modules,[ioq,ioq_app,ioq_sup]} +]}. diff --git a/src/ioq.erl b/src/ioq.erl new file mode 100644 index 000000000..686a6964c --- /dev/null +++ b/src/ioq.erl @@ -0,0 +1,126 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq). +-behaviour(gen_server). + +-export([start_link/0, call/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). + +-record(state, { + concurrency=10, + ratio, + interactive=queue:new(), + compaction=queue:new(), + running=[] +}). + +-record(request, { + fd, + msg, + class, + from, + ref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +call(Fd, Msg) -> + Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()}, + gen_server:call(?MODULE, Request, infinity). + +init(_) -> + Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), + {ok, #state{ratio=Ratio}}. + +handle_call(#request{}=Request, From, State) -> + {noreply, enqueue_request(Request#request{from=From}, State), 0}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({Ref, Reply}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + erlang:demonitor(Ref, [flush]), + gen_server:reply(Request#request.from, Reply), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info({'DOWN', Ref, _, _, Reason}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + gen_server:reply(Request#request.from, {'EXIT', Reason}), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info(timeout, State) -> + {noreply, maybe_submit_request(State)}. + +code_change(_Vsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +enqueue_request(#request{class=compaction}=Request, #state{}=State) -> + State#state{compaction=queue:in(Request, State#state.compaction)}; +enqueue_request(#request{}=Request, #state{}=State) -> + State#state{interactive=queue:in(Request, State#state.interactive)}. + +maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State) + when length(Running) < Concurrency -> + case make_next_request(State) of + State -> + State; + NewState when length(Running) >= Concurrency - 1 -> + NewState; + NewState -> + maybe_submit_request(NewState) + end; +maybe_submit_request(State) -> + State. + +make_next_request(#state{}=State) -> + case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of + {true, true} -> + State; + {true, false} -> + choose_next_request(#state.interactive, State); + {false, true} -> + choose_next_request(#state.compaction, State); + {false, false} -> + case random:uniform() < State#state.ratio of + true -> + choose_next_request(#state.compaction, State); + false -> + choose_next_request(#state.interactive, State) + end + end. + +choose_next_request(Index, State) -> + case queue:out(element(Index, State)) of + {empty, _} -> + State; + {{value, Request}, Q} -> + submit_request(Request, setelement(Index, State, Q)) + end. + +submit_request(#request{}=Request, #state{}=State) -> + Ref = erlang:monitor(process, Request#request.fd), + Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg}, + State#state{running = [Request#request{ref=Ref} | State#state.running]}. diff --git a/src/ioq_app.erl b/src/ioq_app.erl new file mode 100644 index 000000000..2e6d75acb --- /dev/null +++ b/src/ioq_app.erl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + ioq_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl new file mode 100644 index 000000000..c4d04a9e4 --- /dev/null +++ b/src/ioq_sup.erl @@ -0,0 +1,24 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}. |