diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2020-01-06 15:32:46 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-06 15:32:46 -0600 |
commit | 7b098924801869d834e4df52713f004bf9ee7b3d (patch) | |
tree | 30a1b35dd7608bfa0828823ccaca2c61a9631653 | |
parent | c8a5757c33c8f782d935fce2f804cb745bf77b4a (diff) | |
parent | 7c12f14679a7b91abe287cc10bf328a595bf8d33 (diff) | |
download | couchdb-7b098924801869d834e4df52713f004bf9ee7b3d.tar.gz |
Merge pull request #2408 from apache/ioq-in-tree
Bring IOQ in tree
-rw-r--r-- | rebar.config.script | 3 | ||||
-rw-r--r-- | rel/overlay/etc/default.ini | 34 | ||||
-rw-r--r-- | rel/reltool.config | 2 | ||||
-rw-r--r-- | src/ioq/.gitignore | 2 | ||||
-rw-r--r-- | src/ioq/src/ioq.app.src | 21 | ||||
-rw-r--r-- | src/ioq/src/ioq.erl | 189 | ||||
-rw-r--r-- | src/ioq/src/ioq_app.erl | 21 | ||||
-rw-r--r-- | src/ioq/src/ioq_sup.erl | 24 |
8 files changed, 292 insertions, 4 deletions
diff --git a/rebar.config.script b/rebar.config.script index dace4e26c..5d5a6aac3 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -131,6 +131,7 @@ SubDirs = [ "src/dreyfus", "src/fabric", "src/global_changes", + "src/ioq", "src/ken", "src/mango", "src/rexi", @@ -146,8 +147,6 @@ DepDescs = [ {ets_lru, "ets-lru", {tag, "1.0.0"}}, {khash, "khash", {tag, "1.0.1"}}, {snappy, "snappy", {tag, "CouchDB-1.0.4"}}, -{ioq, "ioq", {tag, "2.1.2"}}, -{hqueue, "hqueue", {tag, "CouchDB-1.0.1-1"}}, %% Non-Erlang deps {docs, {url, "https://github.com/apache/couchdb-documentation"}, diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index a1df0805a..f5f057859 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -512,6 +512,40 @@ min_priority = 2.0 [smoosh.ratio_views] min_priority = 2.0 +[ioq] +; The maximum number of concurrent in-flight IO requests that +concurrency = 10 + +; The fraction of the time that a background IO request will be selected +; over an interactive IO request when both queues are non-empty +ratio = 0.01 + +[ioq.bypass] +; System administrators can choose to submit specific classes of IO directly +; to the underlying file descriptor or OS process, bypassing the queues +; altogether. Installing a bypass can yield higher throughput and lower +; latency, but relinquishes some control over prioritization. The following +; classes are recognized with the following defaults: + +; Messages on their way to an external process (e.g., couchjs) are bypassed +os_process = true + +; Disk IO fulfilling interactive read requests is bypassed +read = true + +; Disk IO required to update a database is bypassed +write = true + +; Disk IO required to update views and other secondary indexes is bypassed +view_update = true + +; Disk IO issued by the background replication processes that fix any +; inconsistencies between shard copies is queued +shard_sync = false + +; Disk IO issued by compaction jobs is queued +compaction = false + [dreyfus] ; The name and location of the Clouseau Java service required to ; enable Search functionality. diff --git a/rel/reltool.config b/rel/reltool.config index 512e45c44..1be2ccc9a 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -47,7 +47,6 @@ fabric, folsom, global_changes, - hqueue, hyper, ibrowse, ioq, @@ -105,7 +104,6 @@ {app, fabric, [{incl_cond, include}]}, {app, folsom, [{incl_cond, include}]}, {app, global_changes, [{incl_cond, include}]}, - {app, hqueue, [{incl_cond, include}]}, {app, hyper, [{incl_cond, include}]}, {app, ibrowse, [{incl_cond, include}]}, {app, ioq, [{incl_cond, include}]}, diff --git a/src/ioq/.gitignore b/src/ioq/.gitignore new file mode 100644 index 000000000..21cf3d388 --- /dev/null +++ b/src/ioq/.gitignore @@ -0,0 +1,2 @@ +.rebar +ebin/ diff --git a/src/ioq/src/ioq.app.src b/src/ioq/src/ioq.app.src new file mode 100644 index 000000000..65ea50d6d --- /dev/null +++ b/src/ioq/src/ioq.app.src @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application,ioq, [ + {description, "I/O prioritizing engine"}, + {vsn, git}, + {registered,[]}, + {applications,[kernel,stdlib,config]}, + {mod,{ioq_app,[]}}, + {env, []}, + {modules,[ioq,ioq_app,ioq_sup]} +]}. diff --git a/src/ioq/src/ioq.erl b/src/ioq/src/ioq.erl new file mode 100644 index 000000000..81d94a36f --- /dev/null +++ b/src/ioq/src/ioq.erl @@ -0,0 +1,189 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq). +-behaviour(gen_server). +-behaviour(config_listener). + +-export([start_link/0, call/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). + +% config_listener api +-export([handle_config_change/5, handle_config_terminate/3]). + +-define(RELISTEN_DELAY, 5000). + +-record(state, { + concurrency, + ratio, + interactive=queue:new(), + background=queue:new(), + running=[] +}). + +-record(request, { + fd, + msg, + priority, + from, + ref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +call(Fd, Msg, Metadata) -> + Priority = io_class(Msg, Metadata), + case bypass(Priority) of + true -> + gen_server:call(Fd, Msg); + false -> + queued_call(Fd, Msg, Priority) + end. + +bypass(Priority) -> + config:get("ioq.bypass", atom_to_list(Priority)) =:= "true". + +io_class({prompt, _}, _) -> + os_process; +io_class({data, _}, _) -> + os_process; +io_class(_, {interactive, _}) -> + read; +io_class(_, {db_update, _}) -> + write; +io_class(_, {view_update, _, _}) -> + view_update; +io_class(_, {internal_repl, _}) -> + shard_sync; +io_class(_, {db_compact, _}) -> + compaction; +io_class(_, {view_compact, _, _}) -> + compaction; +io_class(_, _) -> + other. + +queued_call(Fd, Msg, Priority) -> + Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()}, + try + gen_server:call(?MODULE, Request, infinity) + catch + exit:{noproc,_} -> + gen_server:call(Fd, Msg, infinity) + end. + +init(_) -> + ok = config:listen_for_changes(?MODULE, nil), + State = #state{}, + {ok, read_config(State)}. + +read_config(State) -> + Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), + Concurrency = list_to_integer(config:get("ioq", "concurrency", "10")), + State#state{concurrency=Concurrency, ratio=Ratio}. + +handle_call(#request{}=Request, From, State) -> + {noreply, enqueue_request(Request#request{from=From}, State), 0}. + +handle_cast(change, State) -> + {noreply, read_config(State)}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({Ref, Reply}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + erlang:demonitor(Ref, [flush]), + gen_server:reply(Request#request.from, Reply), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; +handle_info({'DOWN', Ref, _, _, Reason}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + gen_server:reply(Request#request.from, {'EXIT', Reason}), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; +handle_info(timeout, State) -> + {noreply, maybe_submit_request(State)}. + +handle_config_change("ioq", _, _, _, _) -> + {ok, gen_server:cast(?MODULE, change)}; +handle_config_change(_, _, _, _, _) -> + {ok, nil}. + +handle_config_terminate(_Server, stop, _State) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). + +code_change(_Vsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +enqueue_request(#request{priority=compaction}=Request, #state{}=State) -> + State#state{background=queue:in(Request, State#state.background)}; +enqueue_request(#request{priority=shard_sync}=Request, #state{}=State) -> + State#state{background=queue:in(Request, State#state.background)}; +enqueue_request(#request{}=Request, #state{}=State) -> + State#state{interactive=queue:in(Request, State#state.interactive)}. + +maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State) + when length(Running) < Concurrency -> + case make_next_request(State) of + State -> + State; + NewState when length(Running) >= Concurrency - 1 -> + NewState; + NewState -> + maybe_submit_request(NewState) + end; +maybe_submit_request(State) -> + State. + +make_next_request(#state{}=State) -> + case {queue:is_empty(State#state.background), queue:is_empty(State#state.interactive)} of + {true, true} -> + State; + {true, false} -> + choose_next_request(#state.interactive, State); + {false, true} -> + choose_next_request(#state.background, State); + {false, false} -> + case couch_rand:uniform() < State#state.ratio of + true -> + choose_next_request(#state.background, State); + false -> + choose_next_request(#state.interactive, State) + end + end. + +choose_next_request(Index, State) -> + case queue:out(element(Index, State)) of + {empty, _} -> + State; + {{value, Request}, Q} -> + submit_request(Request, setelement(Index, State, Q)) + end. + +submit_request(#request{}=Request, #state{}=State) -> + Ref = erlang:monitor(process, Request#request.fd), + Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg}, + State#state{running = [Request#request{ref=Ref} | State#state.running]}. diff --git a/src/ioq/src/ioq_app.erl b/src/ioq/src/ioq_app.erl new file mode 100644 index 000000000..2e6d75acb --- /dev/null +++ b/src/ioq/src/ioq_app.erl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + ioq_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/ioq/src/ioq_sup.erl b/src/ioq/src/ioq_sup.erl new file mode 100644 index 000000000..c4d04a9e4 --- /dev/null +++ b/src/ioq/src/ioq_sup.erl @@ -0,0 +1,24 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}. |