summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-08-16 12:17:40 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-09-11 15:56:36 -0500
commit7c28a76a1f9044b52252e7127c54cee6b4508218 (patch)
tree4c63ddd85167b6186da831c8c82a53a03f6d57a2
parenta7a66c3b74a932adc5f71e4bc5d046928be61389 (diff)
downloadcouchdb-7c28a76a1f9044b52252e7127c54cee6b4508218.tar.gz
Simplify compaction state management
This change adds a new `#comp_st{}` record that is used to pass compaction state through the various compaction steps. There are zero changes to the existing compaction logic. This merely sets the stage for adding our docid copy optimization.
-rw-r--r--src/couch/src/couch_db_updater.erl169
1 files changed, 125 insertions, 44 deletions
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index f78604864..3142516bc 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -23,9 +23,16 @@
-define(IDLE_LIMIT_DEFAULT, 61000).
+-record(comp_st, {
+ old_db,
+ new_db,
+ meta_fd,
+ retry
+}).
+
-record(comp_header, {
db_header,
- meta_state
+ meta_st
}).
-record(merge_st, {
@@ -1034,56 +1041,90 @@ check_md5(_, _) -> throw(md5_mismatch).
start_copy_compact(#db{}=Db) ->
erlang:put(io_priority, {db_compact, Db#db.name}),
- #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
- couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
+ couch_log:debug("Compaction process spawned for db \"~s\"", [Db#db.name]),
+
+ {ok, InitCompSt} = open_compaction_files(Db),
+
+ Stages = [
+ fun copy_purge_info/1,
+ fun copy_compact/1,
+ fun commit_compaction_data/1,
+ fun sort_meta_data/1,
+ fun commit_compaction_data/1,
+ fun copy_meta_data/1,
+ fun compact_final_sync/1
+ ],
- {ok, NewDb, DName, DFd, MFd, Retry} =
- open_compaction_files(Name, Header, Filepath, Options),
- erlang:monitor(process, MFd),
+ FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
+ Stage(CompSt)
+ end, InitCompSt, Stages),
- % This is a bit worrisome. init_db/4 will monitor the data fd
- % but it doesn't know about the meta fd. For now I'll maintain
- % that the data fd is the old normal fd and meta fd is special
- % and hope everything works out for the best.
- unlink(DFd),
+ #comp_st{
+ new_db = FinalNewDb,
+ meta_fd = MetaFd
+ } = FinalCompSt,
- NewDb1 = copy_purge_info(Db, NewDb),
- NewDb2 = copy_compact(Db, NewDb1, Retry),
- NewDb3 = sort_meta_data(NewDb2),
- NewDb4 = commit_compaction_data(NewDb3),
- NewDb5 = copy_meta_data(NewDb4),
- NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
- close_db(NewDb6),
+ close_db(FinalNewDb),
+ ok = couch_file:close(MetaFd),
- ok = couch_file:close(MFd),
- gen_server:cast(Db#db.main_pid, {compact_done, DName}).
+ gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
-open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
+open_compaction_files(OldDb) ->
+ #db{
+ name = DbName,
+ filepath = DbFilePath,
+ options = Options,
+ header = SrcHdr
+ } = OldDb,
DataFile = DbFilePath ++ ".compact.data",
MetaFile = DbFilePath ++ ".compact.meta",
{ok, DataFd, DataHdr} = open_compaction_file(DataFile),
{ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
- case {DataHdr, MetaHdr} of
+ CompSt = case {DataHdr, MetaHdr} of
{#comp_header{}=A, #comp_header{}=A} ->
+ % We're restarting a compaction that did not finish
+ % before trying to swap out with the original db
DbHeader = A#comp_header.db_header,
Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
+ Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_st),
+ #comp_st{
+ old_db = OldDb,
+ new_db = Db1,
+ meta_fd = MetaFd,
+ retry = Db0#db.id_tree
+ };
_ when DataHdrIsDbHdr ->
+ % We tried to swap out the compaction but there were
+ % writes to the database during compaction. Start
+ % a compaction retry.
ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
+ #comp_st{
+ old_db = OldDb,
+ new_db = Db1,
+ meta_fd = MetaFd,
+ retry = Db0#db.id_tree
+ };
_ ->
+ % We're starting a compaction from scratch
Header = couch_db_header:from(SrcHdr),
ok = reset_compaction_file(DataFd, Header),
ok = reset_compaction_file(MetaFd, Header),
Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
Db1 = bind_emsort(Db0, MetaFd, nil),
- {ok, Db1, DataFile, DataFd, MetaFd, nil}
- end.
+ #comp_st{
+ old_db = OldDb,
+ new_db = Db1,
+ meta_fd = MetaFd,
+ retry = nil
+ }
+ end,
+ unlink(DataFd),
+ erlang:monitor(process, MetaFd),
+ {ok, CompSt}.
open_compaction_file(FilePath) ->
@@ -1104,25 +1145,34 @@ reset_compaction_file(Fd, Header) ->
ok = couch_file:write_header(Fd, Header).
-copy_purge_info(OldDb, NewDb) ->
+copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
OldHdr = OldDb#db.header,
NewHdr = NewDb#db.header,
OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
- if OldPurgeSeq > 0 ->
+ NewPurgeSeq = couch_db_header:purge_seq(NewHdr),
+ if OldPurgeSeq > NewPurgeSeq ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
Opts = [{compression, NewDb#db.compression}],
{ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
- NewNewHdr = couch_db_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewDb#db{header = NewNewHdr};
+ CompSt#comp_st{
+ new_db = NewDb#db{
+ header = couch_db_header:set(NewHdr, [
+ {purge_seq, OldPurgeSeq},
+ {purged_docs, Ptr}
+ ])
+ }
+ };
true ->
- NewDb
+ CompSt
end.
-copy_compact(Db, NewDb0, Retry) ->
+copy_compact(#comp_st{} = CompSt) ->
+ #comp_st{
+ old_db = Db,
+ new_db = NewDb0,
+ retry = Retry
+ } = CompSt,
Compression = couch_compress:get_compression_method(),
NewDb = NewDb0#db{compression=Compression},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
@@ -1160,12 +1210,13 @@ copy_compact(Db, NewDb0, Retry) ->
TaskProps0 = [
{type, database_compaction},
+ {retry, (Retry /= nil)},
{database, Db#db.name},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
+ case (Retry /= nil) and couch_task_status:is_task_added() of
true ->
couch_task_status:update([
{retry, true},
@@ -1195,7 +1246,11 @@ copy_compact(Db, NewDb0, Retry) ->
NewDb4 = NewDb3
end,
- commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
+ CompSt#comp_st{
+ new_db = NewDb4#db{
+ update_seq = Db#db.update_seq
+ }
+ }.
copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
@@ -1310,10 +1365,15 @@ copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
{BodyData, NewBinInfos}.
-commit_compaction_data(#db{}=Db) ->
+commit_compaction_data(#comp_st{new_db = Db} = CompSt) ->
% Compaction needs to write headers to both the data file
% and the meta file so if we need to restart we can pick
% back up from where we left off.
+ CompSt#comp_st{
+ new_db = commit_compaction_data(Db)
+ };
+
+commit_compaction_data(#db{} = Db) ->
commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
commit_compaction_data(Db, Db#db.fd).
@@ -1330,7 +1390,7 @@ commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
Header = db_to_header(Db1, OldHeader),
CompHeader = #comp_header{
db_header = Header,
- meta_state = MetaState
+ meta_st = MetaState
},
ok = couch_file:sync(Fd),
ok = couch_file:write_header(Fd, CompHeader),
@@ -1359,12 +1419,20 @@ bind_id_tree(Db, Fd, State) ->
Db#db{id_tree=IdBtree}.
-sort_meta_data(Db0) ->
+sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
{ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
- Db0#db{id_tree=Ems}.
+ CompSt#comp_st{
+ new_db = Db0#db{
+ id_tree = Ems
+ }
+ }.
-copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
+copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
+ #db{
+ fd = Fd,
+ header = Header
+ } = Db,
Src = Db#db.id_tree,
DstState = couch_db_header:id_tree_state(Header),
{ok, IdTree0} = couch_btree:open(DstState, Fd, [
@@ -1384,7 +1452,20 @@ copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
- Db#db{id_tree=IdTree, seq_tree=SeqTree}.
+ CompSt#comp_st{
+ new_db = Db#db{
+ id_tree = IdTree,
+ seq_tree = SeqTree
+ }
+ }.
+
+
+compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
+ NewHdr = db_to_header(NewDb0, NewDb0#db.header),
+ NewDb1 = sync_header(NewDb0, NewHdr),
+ CompSt#comp_st{
+ new_db = NewDb1
+ }.
merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->