diff options
author | Robert Newson <rnewson@apache.org> | 2013-04-20 15:49:43 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2013-04-24 16:28:54 +0100 |
commit | 1d1cd9ad097fbe3dd0977857fb3c08de23d8f4e0 (patch) | |
tree | f4e82d1c5a0f324c6faa22d50b1fd7f95f09ba95 | |
parent | ae6f1ebd8d0c63384050eb8c83b401a01095ad2c (diff) | |
download | couchdb-1775-feature-io-regulator.tar.gz |
Deprioritize compaction I/O1775-feature-io-regulator
When there are compaction vs non-compaction I/O requests, service the
non-compaction requests with higher probability.
Note: For demonstration purposes at the moment, the code is likely too
slow for production use.
COUCHDB-1775
-rw-r--r-- | src/couch_index/src/couch_index_compactor.erl | 1 | ||||
-rw-r--r-- | src/couchdb/Makefile.am | 2 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 1 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 10 | ||||
-rw-r--r-- | src/couchdb/couch_io_regulator.erl | 126 | ||||
-rw-r--r-- | src/couchdb/couch_primary_sup.erl | 6 |
6 files changed, 141 insertions, 5 deletions
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl index 72bff514c..2360e21ef 100644 --- a/src/couch_index/src/couch_index_compactor.erl +++ b/src/couch_index/src/couch_index_compactor.erl @@ -97,6 +97,7 @@ compact(Parent, Mod, IdxState) -> compact(Parent, Mod, IdxState, []). compact(Idx, Mod, IdxState, Opts) -> + erlang:put(io_class, compaction), DbName = Mod:get(db_name, IdxState), Args = [DbName, Mod:get(idx_name, IdxState)], ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args), diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 9fe19bcd3..5b150f0fb 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -57,6 +57,7 @@ source_files = \ couch_httpd_rewrite.erl \ couch_httpd_stats_handlers.erl \ couch_httpd_vhost.erl \ + couch_io_regulator.erl \ couch_key_tree.erl \ couch_log.erl \ couch_native_process.erl \ @@ -114,6 +115,7 @@ compiled_files = \ couch_httpd_rewrite.beam \ couch_httpd_stats_handlers.beam \ couch_httpd_vhost.beam \ + couch_io_regulator.beam \ couch_key_tree.beam \ couch_log.beam \ couch_native_process.beam \ diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 0ce30483d..91a8b74d0 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -968,6 +968,7 @@ copy_compact(Db, NewDb0, Retry) -> commit_data(NewDb4#db{update_seq=Db#db.update_seq}). start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) -> + erlang:put(io_class, compaction), CompactFile = Filepath ++ ".compact", ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index ee5dafbf8..5b837cf2a 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -106,14 +106,14 @@ append_term_md5(Fd, Term, Options) -> %%---------------------------------------------------------------------- append_binary(Fd, Bin) -> - gen_server:call(Fd, {append_bin, assemble_file_chunk(Bin)}, infinity). + couch_io_regulator:io(Fd, {append_bin, assemble_file_chunk(Bin)}). append_binary_md5(Fd, Bin) -> - gen_server:call(Fd, - {append_bin, assemble_file_chunk(Bin, couch_util:md5(Bin))}, infinity). + couch_io_regulator:io(Fd, + {append_bin, assemble_file_chunk(Bin, couch_util:md5(Bin))}). append_raw_chunk(Fd, Chunk) -> - gen_server:call(Fd, {append_bin, Chunk}, infinity). + couch_io_regulator:io(Fd, {append_bin, Chunk}). assemble_file_chunk(Bin) -> @@ -148,7 +148,7 @@ pread_binary(Fd, Pos) -> pread_iolist(Fd, Pos) -> - case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of + case couch_io_regulator:io(Fd, {pread_iolist, Pos}) of {ok, IoList, <<>>} -> {ok, IoList}; {ok, IoList, Md5} -> diff --git a/src/couchdb/couch_io_regulator.erl b/src/couchdb/couch_io_regulator.erl new file mode 100644 index 000000000..a740756f1 --- /dev/null +++ b/src/couchdb/couch_io_regulator.erl @@ -0,0 +1,126 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_io_regulator). +-behaviour(gen_server). + +-export([start_link/0, io/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). + +-record(state, { + concurrency=10, + ratio, + interactive=queue:new(), + compaction=queue:new(), + running=[] +}). + +-record(request, { + fd, + msg, + class, + from, + ref +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +io(Fd, Msg) -> + Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()}, + gen_server:call(?MODULE, Request, infinity). + +init(_) -> + Ratio = list_to_float(couch_config:get("io", "ratio", "0.01")), + {ok, #state{ratio=Ratio}}. + +handle_call(#request{}=Request, From, State) -> + {noreply, enqueue_request(Request#request{from=From}, State), 0}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({Ref, Reply}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + erlang:demonitor(Ref, [flush]), + gen_server:reply(Request#request.from, Reply), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info({'DOWN', Ref, _, _, Reason}, State) -> + case lists:keytake(Ref, #request.ref, State#state.running) of + {value, Request, Remaining} -> + gen_server:reply(Request#request.from, {'EXIT', Reason}), + {noreply, State#state{running=Remaining}, 0}; + false -> + {noreply, State, 0} + end; + +handle_info(timeout, State) -> + {noreply, maybe_submit_request(State)}. + +code_change(_Vsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +enqueue_request(#request{class=compaction}=Request, #state{}=State) -> + State#state{compaction=queue:in(Request, State#state.compaction)}; +enqueue_request(#request{}=Request, #state{}=State) -> + State#state{interactive=queue:in(Request, State#state.interactive)}. + +maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State) + when length(Running) < Concurrency -> + case make_next_request(State) of + State -> + State; + NewState when length(Running) >= Concurrency - 1 -> + NewState; + NewState -> + maybe_submit_request(NewState) + end; +maybe_submit_request(State) -> + State. + +make_next_request(#state{}=State) -> + case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of + {true, true} -> + State; + {true, false} -> + choose_next_request(#state.interactive, State); + {false, true} -> + choose_next_request(#state.compaction, State); + {false, false} -> + case random:uniform() < State#state.ratio of + true -> + choose_next_request(#state.compaction, State); + false -> + choose_next_request(#state.interactive, State) + end + end. + +choose_next_request(Index, State) -> + case queue:out(element(Index, State)) of + {empty, _} -> + State; + {{value, Request}, Q} -> + submit_request(Request, setelement(Index, State, Q)) + end. + +submit_request(#request{}=Request, #state{}=State) -> + Ref = erlang:monitor(process, Request#request.fd), + Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg}, + State#state{running = [Request#request{ref=Ref} | State#state.running]}. diff --git a/src/couchdb/couch_primary_sup.erl b/src/couchdb/couch_primary_sup.erl index 150b92e42..63315c3b5 100644 --- a/src/couchdb/couch_primary_sup.erl +++ b/src/couchdb/couch_primary_sup.erl @@ -31,6 +31,12 @@ init([]) -> brutal_kill, worker, [couch_task_status]}, + {couch_io_regulator, + {couch_io_regulator, start_link, []}, + permanent, + brutal_kill, + worker, + [couch_io_regulator]}, {couch_server, {couch_server, sup_start_link, []}, permanent, |