From 6293a8b88d1cd67c254ac764a046a1c3a0905be4 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Thu, 17 Jul 2014 17:53:45 +0100 Subject: Initial commit This is substantively the work from branch 1775-feature-io-regulator but with erlang application paraphenalia. --- .gitignore | 2 + src/ioq.app.src | 21 ++++++++++ src/ioq.erl | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ioq_app.erl | 21 ++++++++++ src/ioq_sup.erl | 24 +++++++++++ 5 files changed, 194 insertions(+) create mode 100644 .gitignore create mode 100644 src/ioq.app.src create mode 100644 src/ioq.erl create mode 100644 src/ioq_app.erl create mode 100644 src/ioq_sup.erl diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..21cf3d388 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.rebar +ebin/ diff --git a/src/ioq.app.src b/src/ioq.app.src new file mode 100644 index 000000000..65ea50d6d --- /dev/null +++ b/src/ioq.app.src @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application,ioq, [ + {description, "I/O prioritizing engine"}, + {vsn, git}, + {registered,[]}, + {applications,[kernel,stdlib,config]}, + {mod,{ioq_app,[]}}, + {env, []}, + {modules,[ioq,ioq_app,ioq_sup]} +]}. diff --git a/src/ioq.erl b/src/ioq.erl new file mode 100644 index 000000000..686a6964c --- /dev/null +++ b/src/ioq.erl @@ -0,0 +1,126 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq). +-behaviour(gen_server). + +-export([start_link/0, call/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). + +-record(state, { + concurrency=10, + ratio, + interactive=queue:new(), + compaction=queue:new(), + running=[] +}). + +-record(request, { + fd, + msg, + class, + from, + ref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +call(Fd, Msg) -> + Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()}, + gen_server:call(?MODULE, Request, infinity). + +init(_) -> + Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), + {ok, #state{ratio=Ratio}}. + +handle_call(#request{}=Request, From, State) -> + {noreply, enqueue_request(Request#request{from=From}, State), 0}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({Ref, Reply}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + erlang:demonitor(Ref, [flush]), + gen_server:reply(Request#request.from, Reply), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info({'DOWN', Ref, _, _, Reason}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + gen_server:reply(Request#request.from, {'EXIT', Reason}), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info(timeout, State) -> + {noreply, maybe_submit_request(State)}. + +code_change(_Vsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +enqueue_request(#request{class=compaction}=Request, #state{}=State) -> + State#state{compaction=queue:in(Request, State#state.compaction)}; +enqueue_request(#request{}=Request, #state{}=State) -> + State#state{interactive=queue:in(Request, State#state.interactive)}. + +maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State) + when length(Running) < Concurrency -> + case make_next_request(State) of + State -> + State; + NewState when length(Running) >= Concurrency - 1 -> + NewState; + NewState -> + maybe_submit_request(NewState) + end; +maybe_submit_request(State) -> + State. + +make_next_request(#state{}=State) -> + case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of + {true, true} -> + State; + {true, false} -> + choose_next_request(#state.interactive, State); + {false, true} -> + choose_next_request(#state.compaction, State); + {false, false} -> + case random:uniform() < State#state.ratio of + true -> + choose_next_request(#state.compaction, State); + false -> + choose_next_request(#state.interactive, State) + end + end. + +choose_next_request(Index, State) -> + case queue:out(element(Index, State)) of + {empty, _} -> + State; + {{value, Request}, Q} -> + submit_request(Request, setelement(Index, State, Q)) + end. + +submit_request(#request{}=Request, #state{}=State) -> + Ref = erlang:monitor(process, Request#request.fd), + Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg}, + State#state{running = [Request#request{ref=Ref} | State#state.running]}. diff --git a/src/ioq_app.erl b/src/ioq_app.erl new file mode 100644 index 000000000..2e6d75acb --- /dev/null +++ b/src/ioq_app.erl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + ioq_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl new file mode 100644 index 000000000..c4d04a9e4 --- /dev/null +++ b/src/ioq_sup.erl @@ -0,0 +1,24 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ioq_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}. -- cgit v1.2.1 From 04aea732058c5f0b7081b1fa444dbf4b3761cd02 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Mon, 1 Sep 2014 19:20:09 +0100 Subject: new IOQ api --- src/ioq.erl | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 686a6964c..4983b73d6 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -13,7 +13,7 @@ -module(ioq). -behaviour(gen_server). --export([start_link/0, call/2]). +-export([start_link/0, call/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -record(state, { @@ -27,7 +27,7 @@ -record(request, { fd, msg, - class, + priority, from, ref }). @@ -35,8 +35,8 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -call(Fd, Msg) -> - Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()}, +call(Fd, Msg, Priority) -> + Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()}, gen_server:call(?MODULE, Request, infinity). init(_) -> @@ -77,7 +77,9 @@ code_change(_Vsn, State, _Extra) -> terminate(_Reason, _State) -> ok. -enqueue_request(#request{class=compaction}=Request, #state{}=State) -> +enqueue_request(#request{priority={db_compact, _}}=Request, #state{}=State) -> + State#state{compaction=queue:in(Request, State#state.compaction)}; +enqueue_request(#request{priority={view_compact, _, _}}=Request, #state{}=State) -> State#state{compaction=queue:in(Request, State#state.compaction)}; enqueue_request(#request{}=Request, #state{}=State) -> State#state{interactive=queue:in(Request, State#state.interactive)}. -- cgit v1.2.1 From c7f9ad1c0f77b601aa456e62c963793d517c1026 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Fri, 5 Sep 2014 18:22:44 +0100 Subject: Fallback to direct I/O if ioq is not running --- src/ioq.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ioq.erl b/src/ioq.erl index 4983b73d6..b761a0b41 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -37,7 +37,12 @@ start_link() -> call(Fd, Msg, Priority) -> Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()}, - gen_server:call(?MODULE, Request, infinity). + try + gen_server:call(?MODULE, Request, infinity) + catch + exit:{noproc,_} -> + gen_server:call(Fd, Msg, infinity) + end. init(_) -> Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), -- cgit v1.2.1 From 40d157f39c0fa0d80db9ccf61b770b72103ddbed Mon Sep 17 00:00:00 2001 From: Alexander Shorin Date: Mon, 22 Sep 2014 17:32:44 +0400 Subject: Update state on config changes --- src/ioq.erl | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/ioq.erl b/src/ioq.erl index b761a0b41..9bfb1f8e3 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -12,10 +12,14 @@ -module(ioq). -behaviour(gen_server). +-behaviour(config_listener). -export([start_link/0, call/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). +% config_listener api +-export([handle_config_change/5]). + -record(state, { concurrency=10, ratio, @@ -45,12 +49,19 @@ call(Fd, Msg, Priority) -> end. init(_) -> + ok = config:listen_for_changes(?MODULE, nil), + State = #state{}, + {ok, read_config(State)}. + +read_config(State) -> Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), - {ok, #state{ratio=Ratio}}. + State#state{ratio=Ratio}. handle_call(#request{}=Request, From, State) -> {noreply, enqueue_request(Request#request{from=From}, State), 0}. +handle_cast(change, State) -> + {noreply, read_config(State)}; handle_cast(_Msg, State) -> {noreply, State}. @@ -76,6 +87,11 @@ handle_info({'DOWN', Ref, _, _, Reason}, State) -> handle_info(timeout, State) -> {noreply, maybe_submit_request(State)}. +handle_config_change("ioq", _, _, _, _) -> + {ok, gen_server:cast(?MODULE, change)}; +handle_config_change(_, _, _, _, _) -> + {ok, nil}. + code_change(_Vsn, State, _Extra) -> {ok, State}. -- cgit v1.2.1 From c552c665dc6af0ca750ce074c816c4fcb0f05962 Mon Sep 17 00:00:00 2001 From: Alexander Shorin Date: Mon, 22 Sep 2014 17:35:37 +0400 Subject: Allow to customize concurrency value --- src/ioq.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 9bfb1f8e3..4598c3766 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -21,7 +21,7 @@ -export([handle_config_change/5]). -record(state, { - concurrency=10, + concurrency, ratio, interactive=queue:new(), compaction=queue:new(), @@ -55,7 +55,8 @@ init(_) -> read_config(State) -> Ratio = list_to_float(config:get("ioq", "ratio", "0.01")), - State#state{ratio=Ratio}. + Concurrency = list_to_integer(config:get("ioq", "concurrency", "10")), + State#state{concurrency=Concurrency, ratio=Ratio}. handle_call(#request{}=Request, From, State) -> {noreply, enqueue_request(Request#request{from=From}, State), 0}. -- cgit v1.2.1 From 217503577e23f26dfd2f9cbaa52e9f78aaa3b308 Mon Sep 17 00:00:00 2001 From: Alexander Shorin Date: Tue, 13 Jan 2015 03:28:39 +0300 Subject: Handle {gen_event_EXIT,{config_listener,ioq},shutdown} message --- src/ioq.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 4598c3766..6c01b9c60 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -75,7 +75,6 @@ handle_info({Ref, Reply}, State) -> false -> {noreply, State, 0} end; - handle_info({'DOWN', Ref, _, _, Reason}, State) -> case lists:keytake(Ref, #request.ref, State#state.running) of {value, Request, Remaining} -> @@ -84,7 +83,12 @@ handle_info({'DOWN', Ref, _, _, Reason}, State) -> false -> {noreply, State, 0} end; - +handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> + erlang:send_after(5000, self(), restart_config_listener), + {noreply, State}; +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; handle_info(timeout, State) -> {noreply, maybe_submit_request(State)}. -- cgit v1.2.1 From ad60d329a038d2b1aa5ee083f22b1ee7906ec31d Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Fri, 30 Jan 2015 10:59:43 -0800 Subject: Update config_listener behaviuor COUCHDB-2561 --- src/ioq.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 6c01b9c60..c4b3b4e30 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -18,7 +18,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). % config_listener api --export([handle_config_change/5]). +-export([handle_config_change/5, handle_config_terminate/3]). -record(state, { concurrency, @@ -83,12 +83,6 @@ handle_info({'DOWN', Ref, _, _, Reason}, State) -> false -> {noreply, State, 0} end; -handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) -> - erlang:send_after(5000, self(), restart_config_listener), - {noreply, State}; -handle_info(restart_config_listener, State) -> - ok = config:listen_for_changes(?MODULE, nil), - {noreply, State}; handle_info(timeout, State) -> {noreply, maybe_submit_request(State)}. @@ -97,6 +91,13 @@ handle_config_change("ioq", _, _, _, _) -> handle_config_change(_, _, _, _, _) -> {ok, nil}. +handle_config_terminate(_, _, _) -> + spawn(fun() -> + timer:sleep(5000), + config:listen_for_changes(?MODULE, nil) + end), + ok. + code_change(_Vsn, State, _Extra) -> {ok, State}. -- cgit v1.2.1 From 0ffa7cd9fd1e89ae667ed234d21e04696e3033ae Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Fri, 30 Jan 2015 11:30:02 -0800 Subject: Don't restart event handler on termination COUCHDB-2561 --- src/ioq.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ioq.erl b/src/ioq.erl index c4b3b4e30..967a49b86 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -91,6 +91,7 @@ handle_config_change("ioq", _, _, _, _) -> handle_config_change(_, _, _, _, _) -> {ok, nil}. +handle_config_terminate(_, stop, _) -> ok; handle_config_terminate(_, _, _) -> spawn(fun() -> timer:sleep(5000), -- cgit v1.2.1 From bdcfe6aa3a33c28411f0e4b9b121fff33474c3eb Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Mon, 22 Aug 2016 14:42:07 -0700 Subject: Update handle_config_terminate API COUCHDB-3102 --- src/ioq.erl | 16 +++++++++------- src/ioq_sup.erl | 23 ++++++++++++++++++++++- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 967a49b86..93377d6d3 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -20,6 +20,8 @@ % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). +-define(RELISTEN_DELAY, 5000). + -record(state, { concurrency, ratio, @@ -83,6 +85,9 @@ handle_info({'DOWN', Ref, _, _, Reason}, State) -> false -> {noreply, State, 0} end; +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; handle_info(timeout, State) -> {noreply, maybe_submit_request(State)}. @@ -91,13 +96,10 @@ handle_config_change("ioq", _, _, _, _) -> handle_config_change(_, _, _, _, _) -> {ok, nil}. -handle_config_terminate(_, stop, _) -> ok; -handle_config_terminate(_, _, _) -> - spawn(fun() -> - timer:sleep(5000), - config:listen_for_changes(?MODULE, nil) - end), - ok. +handle_config_terminate(_Server, stop, _State) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). code_change(_Vsn, State, _Extra) -> {ok, State}. diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl index c4d04a9e4..56e51ae42 100644 --- a/src/ioq_sup.erl +++ b/src/ioq_sup.erl @@ -13,6 +13,7 @@ -module(ioq_sup). -behaviour(supervisor). -export([start_link/0, init/1]). +-export([handle_config_change/5, handle_config_terminate/3]). %% Helper macro for declaring children of supervisor -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). @@ -21,4 +22,24 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}. + {ok, { {one_for_one, 5, 10}, [ + { + config_listener_mon, + {config_listener_mon, start_link, [?MODULE, nil]}, + permanent, + 5000, + worker, + [config_listener_mon] + }, + ?CHILD(ioq_server, worker) + ]} }. + +handle_config_change("ioq", _Key, _Val, _Persist, St) -> + gen_server:cast(ioq_server, update_config), + {ok, St}; +handle_config_change(_Sec, _Key, _Val, _Persist, St) -> + {ok, St}. + +handle_config_terminate(_Server, _Reason, _State) -> + gen_server:cast(ioq_server, update_config), + ok. -- cgit v1.2.1 From 126a849fa394f336bc769e85adad143a651c4ec1 Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Tue, 23 Aug 2016 14:23:24 -0700 Subject: Fix a typo in a child name COUCHDB-3102 --- src/ioq_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl index 56e51ae42..fa919e4c5 100644 --- a/src/ioq_sup.erl +++ b/src/ioq_sup.erl @@ -31,7 +31,7 @@ init([]) -> worker, [config_listener_mon] }, - ?CHILD(ioq_server, worker) + ?CHILD(ioq, worker) ]} }. handle_config_change("ioq", _Key, _Val, _Persist, St) -> -- cgit v1.2.1 From 5f5375a0d4fb1ceae9a14ba28ad991ff020a9c9e Mon Sep 17 00:00:00 2001 From: Eric Avdey Date: Mon, 3 Oct 2016 11:42:02 -0300 Subject: Remove unused code We are subscribing both ioq and ioq_sup to config_event, but while ioq is actually processing config changes, ioq_sup just sends uncatched messages to unexisting ioq_server. --- src/ioq_sup.erl | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl index fa919e4c5..c4d04a9e4 100644 --- a/src/ioq_sup.erl +++ b/src/ioq_sup.erl @@ -13,7 +13,6 @@ -module(ioq_sup). -behaviour(supervisor). -export([start_link/0, init/1]). --export([handle_config_change/5, handle_config_terminate/3]). %% Helper macro for declaring children of supervisor -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). @@ -22,24 +21,4 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, { {one_for_one, 5, 10}, [ - { - config_listener_mon, - {config_listener_mon, start_link, [?MODULE, nil]}, - permanent, - 5000, - worker, - [config_listener_mon] - }, - ?CHILD(ioq, worker) - ]} }. - -handle_config_change("ioq", _Key, _Val, _Persist, St) -> - gen_server:cast(ioq_server, update_config), - {ok, St}; -handle_config_change(_Sec, _Key, _Val, _Persist, St) -> - {ok, St}. - -handle_config_terminate(_Server, _Reason, _State) -> - gen_server:cast(ioq_server, update_config), - ok. + {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}. -- cgit v1.2.1 From 345804ce4d34786acbf0f498a93eac7013a2b0b5 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Thu, 5 Oct 2017 12:05:28 -0400 Subject: Use couch_rand compatibility module --- src/ioq.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ioq.erl b/src/ioq.erl index 93377d6d3..9ca26567a 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -136,7 +136,7 @@ make_next_request(#state{}=State) -> {false, true} -> choose_next_request(#state.compaction, State); {false, false} -> - case random:uniform() < State#state.ratio of + case couch_rand:uniform() < State#state.ratio of true -> choose_next_request(#state.compaction, State); false -> -- cgit v1.2.1 From e641a740978447f0b29785580e46d2e30e822001 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 18 Dec 2019 15:22:18 -0500 Subject: Enable users to bypass IOQ for certain IO classes This patch allows an administrator to configure a "bypass" which will cause a particular class of IO to be submitted directly to the file descriptor or OS process instead of going through the IO queueing mechanism. Installing a bypass can result in higher throughput and lower latency, at the expense of less control over the stability of the system. A bypass is configured via the `ioq.priority` configuration block: [ioq.bypass] read = true write = true compaction = false This configuration will cause user-submitted read IO to be submitted directly. At this time the following classes are available: - os_process - read - write - view_update - shard_sync - compaction This also expands the "compaction" queue to be a general-purpose "background" queue that handles IO for both compaction and internal replication (aka shard_sync). The other four classes are handled by the "interactive" queue. As before, the [ioq] ratio setting determines the likelihood that background IO will be selected ahead of interactive IO when both queues are non-empty. --- src/ioq.erl | 49 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/src/ioq.erl b/src/ioq.erl index 9ca26567a..81d94a36f 100644 --- a/src/ioq.erl +++ b/src/ioq.erl @@ -26,7 +26,7 @@ concurrency, ratio, interactive=queue:new(), - compaction=queue:new(), + background=queue:new(), running=[] }). @@ -41,7 +41,38 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -call(Fd, Msg, Priority) -> +call(Fd, Msg, Metadata) -> + Priority = io_class(Msg, Metadata), + case bypass(Priority) of + true -> + gen_server:call(Fd, Msg); + false -> + queued_call(Fd, Msg, Priority) + end. + +bypass(Priority) -> + config:get("ioq.bypass", atom_to_list(Priority)) =:= "true". + +io_class({prompt, _}, _) -> + os_process; +io_class({data, _}, _) -> + os_process; +io_class(_, {interactive, _}) -> + read; +io_class(_, {db_update, _}) -> + write; +io_class(_, {view_update, _, _}) -> + view_update; +io_class(_, {internal_repl, _}) -> + shard_sync; +io_class(_, {db_compact, _}) -> + compaction; +io_class(_, {view_compact, _, _}) -> + compaction; +io_class(_, _) -> + other. + +queued_call(Fd, Msg, Priority) -> Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()}, try gen_server:call(?MODULE, Request, infinity) @@ -107,10 +138,10 @@ code_change(_Vsn, State, _Extra) -> terminate(_Reason, _State) -> ok. -enqueue_request(#request{priority={db_compact, _}}=Request, #state{}=State) -> - State#state{compaction=queue:in(Request, State#state.compaction)}; -enqueue_request(#request{priority={view_compact, _, _}}=Request, #state{}=State) -> - State#state{compaction=queue:in(Request, State#state.compaction)}; +enqueue_request(#request{priority=compaction}=Request, #state{}=State) -> + State#state{background=queue:in(Request, State#state.background)}; +enqueue_request(#request{priority=shard_sync}=Request, #state{}=State) -> + State#state{background=queue:in(Request, State#state.background)}; enqueue_request(#request{}=Request, #state{}=State) -> State#state{interactive=queue:in(Request, State#state.interactive)}. @@ -128,17 +159,17 @@ maybe_submit_request(State) -> State. make_next_request(#state{}=State) -> - case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of + case {queue:is_empty(State#state.background), queue:is_empty(State#state.interactive)} of {true, true} -> State; {true, false} -> choose_next_request(#state.interactive, State); {false, true} -> - choose_next_request(#state.compaction, State); + choose_next_request(#state.background, State); {false, false} -> case couch_rand:uniform() < State#state.ratio of true -> - choose_next_request(#state.compaction, State); + choose_next_request(#state.background, State); false -> choose_next_request(#state.interactive, State) end -- cgit v1.2.1 From 2ed9a6c84ddb10f4cfba36bb190f3c78b9405e2e Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 6 Jan 2020 13:46:24 -0500 Subject: Configure IOQ defaults --- rel/overlay/etc/default.ini | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index a1df0805a..f5f057859 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -512,6 +512,40 @@ min_priority = 2.0 [smoosh.ratio_views] min_priority = 2.0 +[ioq] +; The maximum number of concurrent in-flight IO requests that +concurrency = 10 + +; The fraction of the time that a background IO request will be selected +; over an interactive IO request when both queues are non-empty +ratio = 0.01 + +[ioq.bypass] +; System administrators can choose to submit specific classes of IO directly +; to the underlying file descriptor or OS process, bypassing the queues +; altogether. Installing a bypass can yield higher throughput and lower +; latency, but relinquishes some control over prioritization. The following +; classes are recognized with the following defaults: + +; Messages on their way to an external process (e.g., couchjs) are bypassed +os_process = true + +; Disk IO fulfilling interactive read requests is bypassed +read = true + +; Disk IO required to update a database is bypassed +write = true + +; Disk IO required to update views and other secondary indexes is bypassed +view_update = true + +; Disk IO issued by the background replication processes that fix any +; inconsistencies between shard copies is queued +shard_sync = false + +; Disk IO issued by compaction jobs is queued +compaction = false + [dreyfus] ; The name and location of the Clouseau Java service required to ; enable Search functionality. -- cgit v1.2.1 From 7c12f14679a7b91abe287cc10bf328a595bf8d33 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 6 Jan 2020 13:26:28 -0500 Subject: Remove ioq/hqueue dependencies, add ioq as subdir The hqueue dependency is only needed for experimental IOQ2 functionality that is not included in the codebase we're bringing into the mainline. --- rebar.config.script | 3 +-- rel/reltool.config | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/rebar.config.script b/rebar.config.script index dace4e26c..5d5a6aac3 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -131,6 +131,7 @@ SubDirs = [ "src/dreyfus", "src/fabric", "src/global_changes", + "src/ioq", "src/ken", "src/mango", "src/rexi", @@ -146,8 +147,6 @@ DepDescs = [ {ets_lru, "ets-lru", {tag, "1.0.0"}}, {khash, "khash", {tag, "1.0.1"}}, {snappy, "snappy", {tag, "CouchDB-1.0.4"}}, -{ioq, "ioq", {tag, "2.1.2"}}, -{hqueue, "hqueue", {tag, "CouchDB-1.0.1-1"}}, %% Non-Erlang deps {docs, {url, "https://github.com/apache/couchdb-documentation"}, diff --git a/rel/reltool.config b/rel/reltool.config index 512e45c44..1be2ccc9a 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -47,7 +47,6 @@ fabric, folsom, global_changes, - hqueue, hyper, ibrowse, ioq, @@ -105,7 +104,6 @@ {app, fabric, [{incl_cond, include}]}, {app, folsom, [{incl_cond, include}]}, {app, global_changes, [{incl_cond, include}]}, - {app, hqueue, [{incl_cond, include}]}, {app, hyper, [{incl_cond, include}]}, {app, ibrowse, [{incl_cond, include}]}, {app, ioq, [{incl_cond, include}]}, -- cgit v1.2.1 From 0c33ed151df9524719bc190a7d7a26dfc3c60afa Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Mon, 6 Jan 2020 11:47:43 -0600 Subject: Set `couchTests.elixir = true` to skip ported tests This avoids the 1.2s pause between tests to save time during the test suite. All ported tests are also logged to measure our progress porting the JS test suite. --- test/javascript/tests/auth_cache.js | 2 +- test/javascript/tests/cookie_auth.js | 2 +- test/javascript/tests/users_db.js | 2 +- test/javascript/tests/utf8.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/javascript/tests/auth_cache.js b/test/javascript/tests/auth_cache.js index ca8f077e7..73fec3532 100644 --- a/test/javascript/tests/auth_cache.js +++ b/test/javascript/tests/auth_cache.js @@ -10,8 +10,8 @@ // License for the specific language governing permissions and limitations under // the License. +couchTests.elixir = true; couchTests.auth_cache = function(debug) { - return console.log('done in test/elixir/test/auth_cache_test.exs'); if (debug) debugger; // Simple secret key generator diff --git a/test/javascript/tests/cookie_auth.js b/test/javascript/tests/cookie_auth.js index 0dce6bdb6..2d49ebe1c 100644 --- a/test/javascript/tests/cookie_auth.js +++ b/test/javascript/tests/cookie_auth.js @@ -10,9 +10,9 @@ // License for the specific language governing permissions and limitations under // the License. +couchTests.elixir = true; couchTests.cookie_auth = function(debug) { // This tests cookie-based authentication. - return console.log('done in test/elixir/test/cookie_auth_test.exs'); var db_name = get_random_db_name(); var db = new CouchDB(db_name, {"X-Couch-Full-Commit":"false"}); diff --git a/test/javascript/tests/users_db.js b/test/javascript/tests/users_db.js index b13adffec..3ce80256c 100644 --- a/test/javascript/tests/users_db.js +++ b/test/javascript/tests/users_db.js @@ -10,8 +10,8 @@ // License for the specific language governing permissions and limitations under // the License. +couchTests.elixir = true; couchTests.users_db = function(debug) { - return console.log('done in test/elixir/test/users_db_test.exs'); // This tests the users db, especially validations // this should also test that you can log into the couch diff --git a/test/javascript/tests/utf8.js b/test/javascript/tests/utf8.js index a1092c128..cee4d30cb 100644 --- a/test/javascript/tests/utf8.js +++ b/test/javascript/tests/utf8.js @@ -10,8 +10,8 @@ // License for the specific language governing permissions and limitations under // the License. +couchTests.elixir = true; couchTests.utf8 = function(debug) { - return console.log('done in test/elixir/test/utf8_test.exs'); var db_name = get_random_db_name(); var db = new CouchDB(db_name, {"X-Couch-Full-Commit":"false"}); db.createDb(); -- cgit v1.2.1 From f4a7c240683e0375f6a1c8343a51c2fa20e79a57 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Mon, 6 Jan 2020 16:09:30 -0600 Subject: Remove allowance for unnamed_error This test is actually checking the behvior of an OOM in `couchjs` now since we lifted the OS process timeout limit. --- test/javascript/tests/view_errors.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/javascript/tests/view_errors.js b/test/javascript/tests/view_errors.js index 7577b8086..f135b749a 100644 --- a/test/javascript/tests/view_errors.js +++ b/test/javascript/tests/view_errors.js @@ -154,7 +154,7 @@ couchTests.view_errors = function(debug) { db.view("infinite/infinite_loop"); T(0 == 1); } catch(e) { - T(e.error == "os_process_error" || e.error == "unnamed_error"); + T(e.error == "os_process_error"); } // Check error responses for invalid multi-get bodies. -- cgit v1.2.1 From 1169c4470f73023370b1943c5ebe4bc89ae016d6 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Mon, 6 Jan 2020 11:06:40 -0600 Subject: Match the OOM beahvior of 1.8.5 Apparently SpiderMonkey 60 changed the behavior of OOM errors to not exit the VM. This updates the SpiderMonkey 60 implementation to match that behavior. --- src/couch/priv/couch_js/60/main.cpp | 1 + src/couch/priv/couch_js/60/util.cpp | 8 ++++++ src/couch/priv/couch_js/60/util.h | 1 + src/couch/test/eunit/couch_js_tests.erl | 45 +++++++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+) create mode 100644 src/couch/test/eunit/couch_js_tests.erl diff --git a/src/couch/priv/couch_js/60/main.cpp b/src/couch/priv/couch_js/60/main.cpp index ecedfbd3b..e36bc619b 100644 --- a/src/couch/priv/couch_js/60/main.cpp +++ b/src/couch/priv/couch_js/60/main.cpp @@ -420,6 +420,7 @@ main(int argc, const char* argv[]) return 1; JS::SetWarningReporter(cx, couch_error); + JS::SetOutOfMemoryCallback(cx, couch_oom, NULL); JS_SetContextPrivate(cx, args); JS_SetSecurityCallbacks(cx, &security_callbacks); diff --git a/src/couch/priv/couch_js/60/util.cpp b/src/couch/priv/couch_js/60/util.cpp index 894b4254e..92c6cbf4a 100644 --- a/src/couch/priv/couch_js/60/util.cpp +++ b/src/couch/priv/couch_js/60/util.cpp @@ -309,6 +309,14 @@ couch_error(JSContext* cx, JSErrorReport* report) } +void +couch_oom(JSContext* cx, void* data) +{ + fprintf(stderr, "out of memory\n"); + exit(1); +} + + bool couch_load_funcs(JSContext* cx, JS::HandleObject obj, JSFunctionSpec* funcs) { diff --git a/src/couch/priv/couch_js/60/util.h b/src/couch/priv/couch_js/60/util.h index 45caa341f..407e3e602 100644 --- a/src/couch/priv/couch_js/60/util.h +++ b/src/couch/priv/couch_js/60/util.h @@ -35,6 +35,7 @@ JSString* couch_readline(JSContext* cx, FILE* fp); size_t couch_readfile(const char* file, char** outbuf_p); void couch_print(JSContext* cx, unsigned int argc, JS::CallArgs argv); void couch_error(JSContext* cx, JSErrorReport* report); +void couch_oom(JSContext* cx, void* data); bool couch_load_funcs(JSContext* cx, JS::HandleObject obj, JSFunctionSpec* funcs); diff --git a/src/couch/test/eunit/couch_js_tests.erl b/src/couch/test/eunit/couch_js_tests.erl new file mode 100644 index 000000000..d3d92a288 --- /dev/null +++ b/src/couch/test/eunit/couch_js_tests.erl @@ -0,0 +1,45 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_js_tests). +-include_lib("eunit/include/eunit.hrl"). + + +-define(FUNC, << + "function(doc) {\n" + " var val = \"0123456789ABCDEF\";\n" + " while(true) {emit(val, val);}\n" + "}\n" +>>). + + +couch_js_test_() -> + { + "Test couchjs", + { + setup, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + [ + fun should_exit_on_oom/0 + ] + } + }. + + +should_exit_on_oom() -> + Proc = couch_query_servers:get_os_process(<<"javascript">>), + true = couch_query_servers:proc_prompt(Proc, [<<"add_fun">>, ?FUNC]), + ?assertThrow( + {os_process_error, {exit_status, 1}}, + couch_query_servers:proc_prompt(Proc, [<<"map_doc">>, <<"{}">>]) + ). -- cgit v1.2.1