summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Sun <tony.sun427@gmail.com>2020-09-10 13:35:17 -0700
committerGitHub <noreply@github.com>2020-09-10 13:35:17 -0700
commita94e693f32672e4613bce0d80d0b9660f85275ea (patch)
tree30104903da08e826b064b06115f864e32b9604b9
parent45ddc9350e34c609a0a0b0279d0a70f8a4cbc63b (diff)
downloadcouchdb-a94e693f32672e4613bce0d80d0b9660f85275ea.tar.gz
add remonitor code to DOWN message (#3144)
Smoosh monitors the compactor pid to determine when the compaction jobs finishes, and uses this for its idea of concurrency. However, this isn't accurate in the case where the compaction job has to re-spawn to catch up on intervening changes since the same logical compaction job continues with another pid and smoosh is not aware. In such cases, a smoosh channel with concurrency one can start arbitrarily many additional database compaction jobs. To solve this problem, we added a check to see if a compaction PID exists for a db in `start_compact`. But wee need to add another check because this check is only for shard that comes off the queue. So the following can still occur: 1. Enqueue a bunch of stuff into channel with concurrency 1 2. Begin highest priority job, Shard1, in channel 3. Compaction finishes, discovers compaction file is behind main file 4. Smoosh-monitored PID for Shard1 exits, a new one starts to finish the job 5. Smoosh receives the 'DOWN' message, begins the next highest priority job, Shard2 6. Channel concurrency is now 2, not 1 This change adds another check into the 'DOWN' message so that it checks for that specific shard. If the compaction PID exists then it means a new process was spawned and we just monitor that one and add it back to the queue. The length of the queue does not change and therefore we won’t spawn new compaction jobs.
-rw-r--r--src/smoosh/src/smoosh_channel.erl31
1 files changed, 25 insertions, 6 deletions
diff --git a/src/smoosh/src/smoosh_channel.erl b/src/smoosh/src/smoosh_channel.erl
index d8a8d14a9..2a45c17dc 100644
--- a/src/smoosh/src/smoosh_channel.erl
+++ b/src/smoosh/src/smoosh_channel.erl
@@ -122,10 +122,9 @@ handle_info({'DOWN', Ref, _, Job, Reason}, State0) ->
#state{active=Active0, starting=Starting0} = State,
case lists:keytake(Job, 2, Active0) of
{value, {Key, _Pid}, Active1} ->
- couch_log:warning("exit for compaction of ~p: ~p", [
- smoosh_utils:stringify(Key), Reason]),
- {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
- {noreply, maybe_start_compaction(State#state{active=Active1})};
+ State1 = maybe_remonitor_cpid(State#state{active=Active1}, Key,
+ Reason),
+ {noreply, maybe_start_compaction(State1)};
false ->
case lists:keytake(Ref, 1, Starting0) of
{value, {_, Key}, Starting1} ->
@@ -281,8 +280,7 @@ start_compact(State, Db) ->
Ref = erlang:monitor(process, DbPid),
DbPid ! {'$gen_call', {self(), Ref}, start_compact},
State#state{starting=[{Ref, Key}|State#state.starting]};
- % database is still compacting so we can just monitor the existing
- % compaction pid
+ % Compaction is already running, so monitor existing compaction pid.
CPid ->
couch_log:notice("Db ~s continuing compaction",
[smoosh_utils:stringify(Key)]),
@@ -293,6 +291,27 @@ start_compact(State, Db) ->
false
end.
+maybe_remonitor_cpid(State, DbName, Reason) when is_binary(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ case couch_db:get_compactor_pid(Db) of
+ nil ->
+ couch_log:warning("exit for compaction of ~p: ~p",
+ [smoosh_utils:stringify(DbName), Reason]),
+ {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [DbName]),
+ State;
+ CPid ->
+ couch_log:notice("~s compaction already running. Re-monitor Pid ~p",
+ [smoosh_utils:stringify(DbName), CPid]),
+ erlang:monitor(process, CPid),
+ State#state{active=[{DbName, CPid}|State#state.active]}
+ end;
+% not a database compaction, so ignore the pid check
+maybe_remonitor_cpid(State, Key, Reason) ->
+ couch_log:warning("exit for compaction of ~p: ~p",
+ [smoosh_utils:stringify(Key), Reason]),
+ {ok, _} = timer:apply_after(5000, smoosh_server, enqueue, [Key]),
+ State.
+
schedule_unpause() ->
WaitSecs = list_to_integer(config:get("smoosh", "wait_secs", "30")),
erlang:send_after(WaitSecs * 1000, self(), unpause).