summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2014-07-17 17:53:45 +0100
committerRobert Newson <rnewson@apache.org>2014-07-17 18:01:53 +0100
commit6293a8b88d1cd67c254ac764a046a1c3a0905be4 (patch)
treea71b9e79622d55c4ab5031557001536f4a10201b
downloadcouchdb-6293a8b88d1cd67c254ac764a046a1c3a0905be4.tar.gz
Initial commit
This is substantively the work from branch 1775-feature-io-regulator but with erlang application paraphenalia.
-rw-r--r--.gitignore2
-rw-r--r--src/ioq.app.src21
-rw-r--r--src/ioq.erl126
-rw-r--r--src/ioq_app.erl21
-rw-r--r--src/ioq_sup.erl24
5 files changed, 194 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 000000000..21cf3d388
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.rebar
+ebin/
diff --git a/src/ioq.app.src b/src/ioq.app.src
new file mode 100644
index 000000000..65ea50d6d
--- /dev/null
+++ b/src/ioq.app.src
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application,ioq, [
+ {description, "I/O prioritizing engine"},
+ {vsn, git},
+ {registered,[]},
+ {applications,[kernel,stdlib,config]},
+ {mod,{ioq_app,[]}},
+ {env, []},
+ {modules,[ioq,ioq_app,ioq_sup]}
+]}.
diff --git a/src/ioq.erl b/src/ioq.erl
new file mode 100644
index 000000000..686a6964c
--- /dev/null
+++ b/src/ioq.erl
@@ -0,0 +1,126 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq).
+-behaviour(gen_server).
+
+-export([start_link/0, call/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
+
+-record(state, {
+ concurrency=10,
+ ratio,
+ interactive=queue:new(),
+ compaction=queue:new(),
+ running=[]
+}).
+
+-record(request, {
+ fd,
+ msg,
+ class,
+ from,
+ ref
+}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+call(Fd, Msg) ->
+ Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()},
+ gen_server:call(?MODULE, Request, infinity).
+
+init(_) ->
+ Ratio = list_to_float(config:get("ioq", "ratio", "0.01")),
+ {ok, #state{ratio=Ratio}}.
+
+handle_call(#request{}=Request, From, State) ->
+ {noreply, enqueue_request(Request#request{from=From}, State), 0}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({Ref, Reply}, State) ->
+ case lists:keytake(Ref, #request.ref, State#state.running) of
+ {value, Request, Remaining} ->
+ erlang:demonitor(Ref, [flush]),
+ gen_server:reply(Request#request.from, Reply),
+ {noreply, State#state{running=Remaining}, 0};
+ false ->
+ {noreply, State, 0}
+ end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, State) ->
+ case lists:keytake(Ref, #request.ref, State#state.running) of
+ {value, Request, Remaining} ->
+ gen_server:reply(Request#request.from, {'EXIT', Reason}),
+ {noreply, State#state{running=Remaining}, 0};
+ false ->
+ {noreply, State, 0}
+ end;
+
+handle_info(timeout, State) ->
+ {noreply, maybe_submit_request(State)}.
+
+code_change(_Vsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+enqueue_request(#request{class=compaction}=Request, #state{}=State) ->
+ State#state{compaction=queue:in(Request, State#state.compaction)};
+enqueue_request(#request{}=Request, #state{}=State) ->
+ State#state{interactive=queue:in(Request, State#state.interactive)}.
+
+maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State)
+ when length(Running) < Concurrency ->
+ case make_next_request(State) of
+ State ->
+ State;
+ NewState when length(Running) >= Concurrency - 1 ->
+ NewState;
+ NewState ->
+ maybe_submit_request(NewState)
+ end;
+maybe_submit_request(State) ->
+ State.
+
+make_next_request(#state{}=State) ->
+ case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of
+ {true, true} ->
+ State;
+ {true, false} ->
+ choose_next_request(#state.interactive, State);
+ {false, true} ->
+ choose_next_request(#state.compaction, State);
+ {false, false} ->
+ case random:uniform() < State#state.ratio of
+ true ->
+ choose_next_request(#state.compaction, State);
+ false ->
+ choose_next_request(#state.interactive, State)
+ end
+ end.
+
+choose_next_request(Index, State) ->
+ case queue:out(element(Index, State)) of
+ {empty, _} ->
+ State;
+ {{value, Request}, Q} ->
+ submit_request(Request, setelement(Index, State, Q))
+ end.
+
+submit_request(#request{}=Request, #state{}=State) ->
+ Ref = erlang:monitor(process, Request#request.fd),
+ Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg},
+ State#state{running = [Request#request{ref=Ref} | State#state.running]}.
diff --git a/src/ioq_app.erl b/src/ioq_app.erl
new file mode 100644
index 000000000..2e6d75acb
--- /dev/null
+++ b/src/ioq_app.erl
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+ ioq_sup:start_link().
+
+stop(_State) ->
+ ok.
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
new file mode 100644
index 000000000..c4d04a9e4
--- /dev/null
+++ b/src/ioq_sup.erl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}.