summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRussell Branca <chewbranca@apache.org>2021-08-23 18:19:50 -0700
committerRussell Branca <chewbranca@apache.org>2021-08-23 18:19:50 -0700
commit511cf2d15616366aab854cb4d1b798364bf14bf1 (patch)
tree50a33406e81b2fe31909551ba69d3b8a35902784
parent3dc261da0e0ab0c1428be01e0520aff777f5f5bd (diff)
downloadcouchdb-chewbranca-ioq-experiments.tar.gz
-rw-r--r--src/couch/src/couch_bt_engine_compactor.erl5
-rw-r--r--src/couch/src/couch_bt_engine_stream.erl8
-rw-r--r--src/couch/src/couch_file.erl4
-rw-r--r--src/couch/test/eunit/couch_btree_tests.erl11
-rw-r--r--src/couch/test/eunit/couch_file_tests.erl66
-rw-r--r--src/couch_mrview/src/couch_mrview_compactor.erl12
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl4
7 files changed, 86 insertions, 24 deletions
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 3e356e2e3..297e70cd6 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -145,8 +145,9 @@ open_compaction_files(DbName, OldSt, Options) ->
retry = nil
}
end,
- unlink(DataFd),
- erlang:monitor(process, MetaFd),
+ unlink(ioq:fd_pid(DataFd)),
+ unlink(ioq:ioq_pid(DataFd)),
+ erlang:monitor(process, ioq:fd_pid(MetaFd)),
{ok, CompSt}.
diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl
index 431894a50..e57e9a055 100644
--- a/src/couch/src/couch_bt_engine_stream.erl
+++ b/src/couch/src/couch_bt_engine_stream.erl
@@ -20,6 +20,8 @@
to_disk_term/1
]).
+-include_lib("ioq/include/ioq.hrl").
+
foldl({_Fd, []}, _Fun, Acc) ->
Acc;
@@ -56,9 +58,9 @@ seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) ->
end.
-write({Fd, Written}, Data) when is_pid(Fd) ->
- {ok, Pos, _} = couch_file:append_binary(Fd, Data),
- {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}.
+write({#ioq_file{fd=Fd}=IOF, Written}, Data) when is_pid(Fd) ->
+ {ok, Pos, _} = couch_file:append_binary(IOF, Data),
+ {ok, {IOF, [{Pos, iolist_size(Data)} | Written]}}.
finalize({Fd, Written}) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index 675e6d60e..a6eeef5c1 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -889,7 +889,9 @@ is_idle(#file{is_sys=false}) ->
-spec process_info(CouchFilePid :: pid()) ->
{Fd :: pid() | tuple(), FilePath :: string()} | undefined.
-process_info(Pid) ->
+process_info(Pid) when is_pid(Pid) ->
+ couch_util:process_dict_get(Pid, couch_file_fd);
+process_info(#ioq_file{fd=Pid}) ->
couch_util:process_dict_get(Pid, couch_file_fd).
update_read_timestamp() ->
diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl
index c9b791d2c..9e7d3d0d7 100644
--- a/src/couch/test/eunit/couch_btree_tests.erl
+++ b/src/couch/test/eunit/couch_btree_tests.erl
@@ -14,6 +14,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(ROWS, 1000).
-define(TIMEOUT, 60). % seconds
@@ -42,6 +43,8 @@ setup_red() ->
setup_red(_) ->
setup_red().
+teardown(#ioq_file{}=Fd) ->
+ ok = couch_file:close(Fd);
teardown(Fd) when is_pid(Fd) ->
ok = couch_file:close(Fd);
teardown({Fd, _}) ->
@@ -74,6 +77,14 @@ red_test_funs() ->
btree_open_test_() ->
+ {
+ setup,
+ fun() -> test_util:start(?MODULE, [ioq]) end,
+ fun test_util:stop/1,
+ fun btree_should_open/0
+ }.
+
+btree_should_open() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
{ok, Btree} = couch_btree:open(nil, Fd, [{compression, none}]),
{
diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl
index 606f4bbf4..99bb8c057 100644
--- a/src/couch/test/eunit/couch_file_tests.erl
+++ b/src/couch/test/eunit/couch_file_tests.erl
@@ -13,6 +13,7 @@
-module(couch_file_tests).
-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(BLOCK_SIZE, 4096).
-define(setup(F), {setup, fun setup/0, fun teardown/1, F}).
@@ -24,22 +25,52 @@ setup() ->
Fd.
teardown(Fd) ->
- case is_process_alive(Fd) of
+ case is_process_alive(ioq:fd_pid(Fd)) of
true -> ok = couch_file:close(Fd);
false -> ok
end.
+mock_server() ->
+ %%meck:new(config),
+ meck:expect(config, get, fun(Group) ->
+ []
+ end),
+ meck:expect(config, get, fun(_,_) ->
+ undefined
+ end),
+ meck:expect(config, get, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get, fun(_, _, Default) ->
+ Default
+ end),
+ %% meck:expect(config, get, fun(_, _, _) ->
+ %% undefined
+ %% end),
+ meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+ Default
+ end).
+
+
+unmock_server(_) ->
+ true = meck:validate(config),
+ ok = meck:unload(config).
+
open_close_test_() ->
{
"Test for proper file open and close",
{
setup,
+ %%fun() -> mock_server(), test_util:start(?MODULE, [ioq]) end, fun(A) -> unmock_server(A), test_util:stop(A) end,
fun() -> test_util:start(?MODULE, [ioq]) end, fun test_util:stop/1,
[
should_return_enoent_if_missed(),
should_ignore_invalid_flags_with_open(),
?setup(fun should_return_pid_on_file_open/1),
- should_close_file_properly(),
+ ?setup(fun should_close_file_properly/0),
?setup(fun should_create_empty_new_files/1)
]
}
@@ -53,7 +84,7 @@ should_ignore_invalid_flags_with_open() ->
couch_file:open(?tempfile(), [create, invalid_option])).
should_return_pid_on_file_open(Fd) ->
- ?_assert(is_pid(Fd)).
+ ?_assert(is_pid(ioq:fd_pid(Fd))).
should_close_file_properly() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
@@ -138,10 +169,15 @@ should_not_read_beyond_eof(Fd) ->
{ok, Io} = file:open(Filepath, [read, write, binary]),
ok = file:pwrite(Io, Pos, <<0:1/integer, DoubleBin:31/integer>>),
file:close(Io),
- unlink(Fd),
- ExpectedError = {badmatch, {'EXIT', {bad_return_value,
- {read_beyond_eof, Filepath}}}},
- ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ unlink(ioq:fd_pid(Fd)),
+ unlink(ioq:ioq_pid(Fd)),
+ %% ExpectedError = {badmatch, {'EXIT', {bad_return_value,
+ %% {read_beyond_eof, Filepath}}}},
+ %%ExpectedError = {exit, {{bad_return_value, {read_beyond_eof Filepath,}}, _}, _},
+
+ ?_assertExit(
+ {{bad_return_value, {read_beyond_eof, Filepath}}, _},
+ couch_file:pread_binary(Fd, Pos)).
should_truncate(Fd) ->
{ok, 0, _} = couch_file:append_term(Fd, foo),
@@ -179,10 +215,19 @@ should_not_read_more_than_pread_limit(Fd) ->
{_, Filepath} = couch_file:process_info(Fd),
BigBin = list_to_binary(lists:duplicate(100000, 0)),
{ok, Pos, _Size} = couch_file:append_binary(Fd, BigBin),
- unlink(Fd),
+ unlink(ioq:fd_pid(Fd)),
+ unlink(ioq:ioq_pid(Fd)),
ExpectedError = {badmatch, {'EXIT', {bad_return_value,
{exceed_pread_limit, Filepath, 50000}}}},
- ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ ?debugFmt("EXPECTED ERROR IS: ~p~n", [ExpectedError]),
+ %%?_assert(couch_file:pread_binary(Fd, Pos)).
+ %%try couch_file:pread_binary(Fd, Pos)
+ %%catch E:R:S -> ?debugFmt("GOT ERROR: ~p || ~p~n~p~n", [E,R,S])
+ %%end,
+ %%?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ ?_assertExit(
+ {{bad_return_value, {exceed_pread_limit, Filepath, 50000}}, _},
+ couch_file:pread_binary(Fd, Pos)).
header_test_() ->
@@ -537,7 +582,8 @@ fsync_error_test_() ->
fsync_raises_errors() ->
- Fd = spawn(fun() -> fake_fsync_fd() end),
+ FdPid = spawn(fun() -> fake_fsync_fd() end),
+ Fd = #ioq_file{fd=FdPid},
?assertError({fsync_error, eio}, couch_file:sync(Fd)).
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index d42edc054..82d0629dc 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -115,7 +115,7 @@ compact(State) ->
compact_view(View, EmptyView, BufferSize, Acc)
end, FinalAcc2, lists:zip(Views, EmptyViews)),
- unlink(EmptyState#mrst.fd),
+ unlink(ioq:fd_pid(EmptyState#mrst.fd)),
{ok, EmptyState#mrst{
id_btree=NewIdBtree,
views=NewViews,
@@ -132,7 +132,7 @@ recompact(#mrst{db_name=DbName, idx_name=IdxName}, 0) ->
recompact(State, RetryCount) ->
Self = self(),
- link(State#mrst.fd),
+ link(ioq:fd_pid(State#mrst.fd)),
{Pid, Ref} = erlang:spawn_monitor(fun() ->
couch_index_updater:update(Self, couch_mrview_index, State)
end),
@@ -144,10 +144,10 @@ recompact_loop(Pid, Ref, State, RetryCount) ->
% We've made progress so reset RetryCount
recompact_loop(Pid, Ref, State2, recompact_retry_count());
{'DOWN', Ref, _, _, {updated, Pid, State2}} ->
- unlink(State#mrst.fd),
+ unlink(ioq:fd_pid(State#mrst.fd)),
{ok, State2};
{'DOWN', Ref, _, _, Reason} ->
- unlink(State#mrst.fd),
+ unlink(ioq:fd_pid(State#mrst.fd)),
couch_log:warning("Error during recompaction: ~r", [Reason]),
recompact(State, RetryCount - 1)
end.
@@ -218,7 +218,7 @@ swap_compacted(OldState, NewState) ->
fd=NewFd
} = NewState,
- link(NewState#mrst.fd),
+ link(ioq:fd_pid(NewState#mrst.fd)),
Ref = erlang:monitor(process, NewState#mrst.fd),
RootDir = couch_index_util:root_dir(),
@@ -232,7 +232,7 @@ swap_compacted(OldState, NewState) ->
ok = couch_file:delete(RootDir, IndexFName),
ok = file:rename(CompactFName, IndexFName),
- unlink(OldState#mrst.fd),
+ unlink(ioq:fd_pid(OldState#mrst.fd)),
erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
{ok, NewState#mrst{fd_monitor=Ref}}.
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index d318a3f4a..6486c9375 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -81,7 +81,7 @@ get_signature_from_filename(FileName) ->
get_view(Db, DDoc, ViewName, Args0) ->
case get_view_index_state(Db, DDoc, ViewName, Args0) of
{ok, State, Args2} ->
- Ref = erlang:monitor(process, State#mrst.fd),
+ Ref = erlang:monitor(process, ioq:fd_pid(State#mrst.fd)),
#mrst{language=Lang, views=Views} = State,
{Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
check_range(Args3, view_cmp(View)),
@@ -298,7 +298,7 @@ init_state(Db, Fd, State, Header) ->
State#mrst{
fd=Fd,
- fd_monitor=erlang:monitor(process, Fd),
+ fd_monitor=erlang:monitor(process, ioq:fd_pid(Fd)),
update_seq=Seq,
purge_seq=PurgeSeq,
id_btree=IdBtree,