summaryrefslogtreecommitdiff
path: root/src/mongo/db/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/concurrency')
-rw-r--r--src/mongo/db/concurrency/deferred_writer.cpp26
-rw-r--r--src/mongo/db/concurrency/deferred_writer.h2
2 files changed, 15 insertions, 13 deletions
diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp
index be9facf84ec..9c6abe5ff47 100644
--- a/src/mongo/db/concurrency/deferred_writer.cpp
+++ b/src/mongo/db/concurrency/deferred_writer.cpp
@@ -48,7 +48,6 @@ auto kLogInterval = stdx::chrono::minutes(1);
void DeferredWriter::_logFailure(const Status& status) {
if (TimePoint::clock::now() - _lastLogged > kLogInterval) {
LOGV2(20516,
- "Unable to write to collection {namespace}: {error}",
"Unable to write to collection",
"namespace"_attr = _nss.toString(),
"error"_attr = status);
@@ -100,14 +99,13 @@ StatusWith<std::unique_ptr<AutoGetCollection>> DeferredWriter::_getCollection(
return std::move(agc);
}
-void DeferredWriter::_worker(InsertStatement stmt) try {
+Status DeferredWriter::_worker(InsertStatement stmt) noexcept try {
auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
OperationContext* opCtx = uniqueOpCtx.get();
auto result = _getCollection(opCtx);
if (!result.isOK()) {
- _logFailure(result.getStatus());
- return;
+ return result.getStatus();
}
auto agc = std::move(result.getValue());
@@ -128,13 +126,9 @@ void DeferredWriter::_worker(InsertStatement stmt) try {
stdx::lock_guard<Latch> lock(_mutex);
_numBytes -= stmt.doc.objsize();
-
- // If a write to a deferred collection fails, periodically tell the log.
- if (!status.isOK()) {
- _logFailure(status);
- }
+ return status;
} catch (const DBException& e) {
- _logFailure(e.toStatus());
+ return e.toStatus();
}
DeferredWriter::DeferredWriter(NamespaceString nss, CollectionOptions opts, int64_t maxSize)
@@ -155,7 +149,12 @@ void DeferredWriter::startup(std::string workerName) {
options.threadNamePrefix = workerName;
options.minThreads = 0;
options.maxThreads = 1;
- options.onCreateThread = [](const std::string& name) { Client::initThread(name); };
+ options.onCreateThread = [](const std::string& name) {
+ Client::initThread(name);
+
+ stdx::lock_guard<Client> lk(cc());
+ cc().setSystemOperationKillableByStepdown(lk);
+ };
_pool = std::make_unique<ThreadPool>(options);
_pool->startup();
}
@@ -190,7 +189,10 @@ bool DeferredWriter::insertDocument(BSONObj obj) {
_pool->schedule([this, obj](auto status) {
fassert(40588, status);
- _worker(InsertStatement(obj.getOwned()));
+ auto workerStatus = _worker(InsertStatement(obj.getOwned()));
+ if (!workerStatus.isOK()) {
+ _logFailure(workerStatus);
+ }
});
return true;
}
diff --git a/src/mongo/db/concurrency/deferred_writer.h b/src/mongo/db/concurrency/deferred_writer.h
index 0ac8238fa8d..1f45fbb8d15 100644
--- a/src/mongo/db/concurrency/deferred_writer.h
+++ b/src/mongo/db/concurrency/deferred_writer.h
@@ -136,7 +136,7 @@ private:
/**
* The method that the worker thread will run.
*/
- void _worker(InsertStatement stmt);
+ Status _worker(InsertStatement stmt) noexcept;
/**
* The options for the collection, in case we need to create it.