summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorIan Kuehne <ian.kuehne@mongodb.com>2017-08-08 10:38:37 -0400
committerIan Kuehne <ian.kuehne@mongodb.com>2017-08-08 13:35:48 -0400
commit3a2a10fa15a98f13bdf25697bafb0781987ebe07 (patch)
tree9a34984362037e2a5f1746612377221cf3df299d /src/mongo
parent2d4c9492b8d2f0190fef1b7b758e49d63ef4659e (diff)
downloadmongo-3a2a10fa15a98f13bdf25697bafb0781987ebe07.tar.gz
SERVER-30378 Add DeferredWriter.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/concurrency/SConscript16
-rw-r--r--src/mongo/db/concurrency/deferred_writer.cpp173
-rw-r--r--src/mongo/db/concurrency/deferred_writer.h164
-rw-r--r--src/mongo/dbtests/SConscript2
-rw-r--r--src/mongo/dbtests/deferred_writer.cpp376
5 files changed, 731 insertions, 0 deletions
diff --git a/src/mongo/db/concurrency/SConscript b/src/mongo/db/concurrency/SConscript
index 6b837ecb68f..8f8a703b388 100644
--- a/src/mongo/db/concurrency/SConscript
+++ b/src/mongo/db/concurrency/SConscript
@@ -5,6 +5,22 @@ Import("env")
env = env.Clone()
env.Library(
+ target='deferred_writer',
+ source=[
+ 'deferred_writer.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/catalog/collection',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
+ '$BUILD_DIR/mongo/db/db_raii',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ ],
+)
+
+env.Library(
target='write_conflict_exception',
source=[
'write_conflict_exception.cpp'
diff --git a/src/mongo/db/concurrency/deferred_writer.cpp b/src/mongo/db/concurrency/deferred_writer.cpp
new file mode 100644
index 00000000000..0ffa4a7c353
--- /dev/null
+++ b/src/mongo/db/concurrency/deferred_writer.cpp
@@ -0,0 +1,173 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
+
+#include "mongo/db/concurrency/deferred_writer.h"
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+auto kLogInterval = stdx::chrono::hours(1);
+}
+
+void DeferredWriter::_logFailure(const Status& status) {
+ if (TimePoint::clock::now() - _lastLogged > kLogInterval) {
+ log() << "Unable to write to collection " << _nss.toString() << ": " << status.toString();
+ _lastLogged = stdx::chrono::system_clock::now();
+ }
+}
+
+Status DeferredWriter::_makeCollection(OperationContext* opCtx) {
+ BSONObjBuilder builder;
+ builder.append("create", _nss.coll());
+ builder.appendElements(_collectionOptions.toBSON());
+ try {
+ return createCollection(opCtx, _nss.db().toString(), builder.obj().getOwned());
+ } catch (const DBException& exception) {
+ return exception.toStatus();
+ }
+}
+
+StatusWith<std::unique_ptr<AutoGetCollection>> DeferredWriter::_getCollection(
+ OperationContext* opCtx) {
+ std::unique_ptr<AutoGetCollection> agc;
+ agc = stdx::make_unique<AutoGetCollection>(opCtx, _nss, MODE_IX);
+
+ while (!agc->getCollection()) {
+ // Release the previous AGC's lock before trying to rebuild the collection.
+ agc.reset();
+ Status status = _makeCollection(opCtx);
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ agc = stdx::make_unique<AutoGetCollection>(opCtx, _nss, MODE_IX);
+ }
+
+ return std::move(agc);
+}
+
+void DeferredWriter::_worker(InsertStatement stmt) {
+ auto uniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ OperationContext* opCtx = uniqueOpCtx.get();
+ auto result = _getCollection(opCtx);
+
+ if (!result.isOK()) {
+ _logFailure(result.getStatus());
+ return;
+ }
+
+ auto agc = std::move(result.getValue());
+
+ Collection& collection = *agc->getCollection();
+
+ Status status = writeConflictRetry(opCtx, "deferred insert", _nss.ns(), [&] {
+ WriteUnitOfWork wuow(opCtx);
+ Status status = collection.insertDocument(opCtx, stmt, nullptr, false);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ wuow.commit();
+ return Status::OK();
+ });
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ _numBytes -= stmt.doc.objsize();
+
+ // If a write to a deferred collection fails, periodically tell the log.
+ if (!status.isOK()) {
+ _logFailure(status);
+ }
+}
+
+DeferredWriter::DeferredWriter(NamespaceString nss, CollectionOptions opts, int64_t maxSize)
+ : _collectionOptions(opts),
+ _maxNumBytes(maxSize),
+ _nss(nss),
+ _numBytes(0),
+ _lastLogged(TimePoint::clock::now() - kLogInterval) {}
+
+DeferredWriter::~DeferredWriter() {}
+
+void DeferredWriter::startup(std::string workerName) {
+ // We should only start up once.
+ invariant(!_pool);
+ ThreadPool::Options options;
+ options.poolName = "deferred writer pool";
+ options.threadNamePrefix = workerName;
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ options.onCreateThread = [](const std::string& name) { Client::initThread(name); };
+ _pool = stdx::make_unique<ThreadPool>(options);
+ _pool->startup();
+}
+
+void DeferredWriter::shutdown(void) {
+ // If we never allocated the pool, no cleanup is necessary.
+ if (!_pool) {
+ return;
+ }
+
+ _pool->waitForIdle();
+ _pool->shutdown();
+ _pool->join();
+}
+
+bool DeferredWriter::insertDocument(BSONObj obj) {
+ // We can't insert documents if we haven't been started up.
+ invariant(_pool);
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // Check if we're allowed to insert this object.
+ if (_numBytes + obj.objsize() >= _maxNumBytes) {
+ // If not, drop it. We always drop new entries rather than old ones; that way the caller
+ // knows at the time of the call that the entry was dropped.
+ return false;
+ }
+
+ // Add the object to the buffer.
+ _numBytes += obj.objsize();
+ fassertStatusOK(40588,
+ _pool->schedule([this, obj] { _worker(InsertStatement(obj.getOwned())); }));
+ return true;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/concurrency/deferred_writer.h b/src/mongo/db/concurrency/deferred_writer.h
new file mode 100644
index 00000000000..c6721bff0c2
--- /dev/null
+++ b/src/mongo/db/concurrency/deferred_writer.h
@@ -0,0 +1,164 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class AutoGetCollection;
+class ThreadPool;
+
+/**
+ * Provides an interface for asynchronously adding to a collection.
+ *
+ * Allows writes to a collection in a context without appropriate locks by buffering them in memory
+ * and asynchronously writing them to the backing collection. Useful when an operation with e.g. a
+ * global MODE_S lock needs to write, but doesn't care that the write shows up immediately.
+ * Motivated by the local health log. For obvious reasons, cannot provide strong durability
+ * guarantees, and cannot report whether the insert succeeded--in other words, this class provides
+ * eventual "best effort" inserts.
+ *
+ * Because this class is motivated by the health log and errors cannot be cleanly reported to the
+ * caller, it cannot report most errors to the client; it instead periodically logs any errors to
+ * the system log.
+ *
+ * Instances of this class are unconditionally thread-safe, and cannot cause deadlock barring
+ * improper use of the ctor, `flush` and `shutdown` methods below.
+ */
+class DeferredWriter {
+ MONGO_DISALLOW_COPYING(DeferredWriter);
+
+public:
+ /**
+ * Create a new DeferredWriter for writing to a given collection.
+ *
+ * Will not begin writing to the backing collection until `startup` is called.
+ *
+ * @param opts The options to use when creating the backing collection if it doesn't exist.
+ * @param maxSize the maximum number of bytes to store in the buffer.
+ */
+ DeferredWriter(NamespaceString nss, CollectionOptions opts, int64_t maxSize);
+
+ /**
+ * Start the background worker thread writing to the given collection.
+ *
+ * @param workerName The name of the client associated with the worker thread.
+ */
+ void startup(std::string workerName);
+
+ /**
+ * Flush the buffer and `join` the worker thread.
+ *
+ * IMPORTANT: Must be called before destruction if `startup` has been called.
+ *
+ * Blocks until buffered writes complete. Must not be called repeatedly.
+ */
+ void shutdown(void);
+
+ /**
+ * Cleans up the writer.
+ *
+ * Does not clean up the worker thread; call `shutdown` for that. Instead, if the worker thread
+ * is still running calls std::terminate, which crashes the server.
+ */
+ ~DeferredWriter();
+
+ /**
+ * Deferred-insert the given object.
+ *
+ * Returns whether the object was successfully pushed onto the in-memory buffer (*not* whether
+ * it was successfully added to the underlying collection). Creates the backing collection if
+ * it doesn't exist.
+ */
+ bool insertDocument(BSONObj obj);
+
+private:
+ /**
+ * Log failure, but only if a certain interval has passed since the last log.
+ */
+ void _logFailure(const Status& status);
+
+ /**
+ * Create the backing collection if it doesn't exist.
+ *
+ * Return whether creation succeeded.
+ */
+ Status _makeCollection(OperationContext* opCtx);
+
+ /**
+ * Ensure that the backing collection exists, and pass back a lock and handle to it.
+ */
+ StatusWith<std::unique_ptr<AutoGetCollection>> _getCollection(OperationContext* opCtx);
+
+ /**
+ * The method that the worker thread will run.
+ */
+ void _worker(InsertStatement stmt);
+
+ /**
+ * The options for the collection, in case we need to create it.
+ */
+ const CollectionOptions _collectionOptions;
+
+ /**
+ * The size limit of the in-memory buffer.
+ */
+ const int64_t _maxNumBytes;
+
+ /**
+ * The name of the backing collection.
+ */
+ const NamespaceString _nss;
+
+ std::unique_ptr<ThreadPool> _pool;
+
+ /**
+ * Guards all non-const, non-thread-safe members.
+ */
+ stdx::mutex _mutex;
+
+ /**
+ * The number of bytes currently in the in-memory buffer.
+ */
+ int64_t _numBytes;
+
+ /**
+ * Time we last logged that we can't write to the underlying collection.
+ *
+ * Ensures we don't flood the log with such entries.
+ */
+ using TimePoint = stdx::chrono::time_point<stdx::chrono::system_clock>;
+ TimePoint _lastLogged;
+};
+
+} // namespace mongo
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index ec3db5a0300..1db90bb2605 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -59,6 +59,7 @@ dbtest = env.Program(
'counttests.cpp',
'dbhelper_tests.cpp',
'dbtests.cpp',
+ 'deferred_writer.cpp',
'directclienttests.cpp',
'documentsourcetests.cpp',
'executor_registry.cpp',
@@ -117,6 +118,7 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/bson/mutable/mutable_bson_test_utils",
"$BUILD_DIR/mongo/db/auth/authmocks",
"$BUILD_DIR/mongo/db/bson/dotted_path_support",
+ "$BUILD_DIR/mongo/db/concurrency/deferred_writer",
"$BUILD_DIR/mongo/db/op_observer_d",
"$BUILD_DIR/mongo/db/query/collation/collator_interface_mock",
"$BUILD_DIR/mongo/db/query/query",
diff --git a/src/mongo/dbtests/deferred_writer.cpp b/src/mongo/dbtests/deferred_writer.cpp
new file mode 100644
index 00000000000..6a7641bd033
--- /dev/null
+++ b/src/mongo/dbtests/deferred_writer.cpp
@@ -0,0 +1,376 @@
+/**
+ * Copyright (C) 2017 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <chrono>
+
+#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/db/client.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/deferred_writer.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/dbtests/dbtests.h"
+#include "mongo/stdx/chrono.h"
+
+namespace deferred_writer_tests {
+
+namespace {
+AtomicWord<int64_t> counter;
+
+// Get a new well-formed object with a unique _id field.
+BSONObj getObj(void) {
+ return BSON("_id" << counter.fetchAndAdd(1));
+}
+
+// An STL-style Compare for BSONObjs.
+struct BSONObjCompare {
+ bool operator()(const BSONObj& lhs, const BSONObj& rhs) const {
+ return SimpleBSONObjComparator::kInstance.compare(lhs, rhs) < 0;
+ }
+};
+}
+
+static const NamespaceString kTestNamespace("unittests", "deferred_writer_tests");
+
+/**
+ * For exception-safe code with DeferredWriter.
+ *
+ * If a test fails with a DeferredWriter wrapped in one of these it doesn't crash the server. Note
+ * that this is not in general a good idea, because it has a potentially blocking destructor.
+ */
+class RaiiWrapper {
+public:
+ explicit RaiiWrapper(std::unique_ptr<DeferredWriter> writer) : _writer(std::move(writer)) {
+ _writer->startup("DeferredWriter test");
+ }
+
+ RaiiWrapper(RaiiWrapper&& other) : _writer(std::move(other._writer)) {}
+
+ ~RaiiWrapper() {
+ _writer->shutdown();
+ }
+
+ DeferredWriter* get(void) {
+ return _writer.get();
+ }
+
+private:
+ std::unique_ptr<DeferredWriter> _writer;
+};
+
+/**
+ * Provides some handy utilities.
+ */
+class DeferredWriterTestBase {
+public:
+ DeferredWriterTestBase() : _client(_opCtx.get()) {}
+
+ virtual ~DeferredWriterTestBase() {}
+
+ void createCollection(void) {
+ _client.createCollection(kTestNamespace.toString());
+ }
+
+ void dropCollection(void) {
+ if (AutoGetCollection(_opCtx.get(), kTestNamespace, MODE_IS).getCollection()) {
+ _client.dropCollection(kTestNamespace.toString());
+ }
+ }
+
+ void ensureEmpty(void) {
+ dropCollection();
+ createCollection();
+ }
+
+ /**
+ * Just read the whole collection into memory.
+ */
+ std::vector<BSONObj> readCollection(void) {
+ AutoGetCollection agc(_opCtx.get(), kTestNamespace, MODE_IS);
+ ASSERT_TRUE(agc.getCollection());
+
+ auto plan = InternalPlanner::collectionScan(
+ _opCtx.get(), kTestNamespace.ns(), agc.getCollection(), PlanExecutor::NO_YIELD);
+
+ std::vector<BSONObj> result;
+ BSONObj i;
+ while (plan->getNext(&i, nullptr) == PlanExecutor::ExecState::ADVANCED) {
+ result.push_back(i);
+ }
+
+ return result;
+ }
+
+ /**
+ * Get a writer to the test collection.
+ */
+ RaiiWrapper getWriter(CollectionOptions options = CollectionOptions(),
+ int64_t maxSize = 200'000) {
+ return RaiiWrapper(stdx::make_unique<DeferredWriter>(kTestNamespace, options, maxSize));
+ }
+
+ virtual void run(void) = 0;
+
+protected:
+ const ServiceContext::UniqueOperationContext _opCtx = cc().makeOperationContext();
+ DBDirectClient _client;
+};
+
+/**
+ * Launch a bunch of threads and wait until they all finish.
+ */
+class ThreadLauncher {
+public:
+ template <typename T, typename... Args>
+ void launch(int count, T&& task, Args&&... args) {
+ for (int i = 0; i < count; ++i) {
+ threads.push_back(stdx::thread(task, args...));
+ }
+ }
+
+ void await(void) {
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ }
+
+private:
+ std::vector<stdx::thread> threads;
+};
+
+/**
+ * Test that the deferred writer will create the collection if it is empty.
+ */
+class DeferredWriterTestEmpty : public DeferredWriterTestBase {
+public:
+ ~DeferredWriterTestEmpty(){};
+
+ void run() {
+ {
+ auto gw = getWriter();
+ auto writer = gw.get();
+ writer->insertDocument(getObj());
+ }
+ ASSERT_TRUE(AutoGetCollection(_opCtx.get(), kTestNamespace, MODE_IS).getCollection());
+ ASSERT_TRUE(readCollection().size() == 1);
+ }
+};
+
+/**
+ * Test that concurrent inserts to the DeferredWriter work, and never drop writes.
+ */
+class DeferredWriterTestConcurrent : public DeferredWriterTestBase {
+public:
+ ~DeferredWriterTestConcurrent(){};
+
+ void worker(DeferredWriter* writer) {
+ for (int i = 0; i < kDocsPerWorker; ++i) {
+ writer->insertDocument(getObj());
+ }
+ }
+
+ void run() {
+ ensureEmpty();
+ {
+ auto gw = getWriter();
+ auto writer = gw.get();
+ ThreadLauncher launcher;
+ // Launch some threads inserting into the writer.
+ launcher.launch(kNWorkers, &DeferredWriterTestConcurrent::worker, this, writer);
+ launcher.await();
+ }
+ ASSERT_EQ(readCollection().size(), (size_t)(kNWorkers * kDocsPerWorker));
+ }
+
+private:
+ static const int kNWorkers = 20;
+ static const int kDocsPerWorker = 100;
+};
+
+bool compareBsonObjects(const BSONObj& lhs, const BSONObj& rhs) {
+ return SimpleBSONObjComparator::kInstance.compare(lhs, rhs) < 0;
+}
+
+/**
+ * Test that the documents make it through the writer unchanged.
+ */
+class DeferredWriterTestConsistent : public DeferredWriterTestBase {
+public:
+ ~DeferredWriterTestConsistent() {}
+
+ void run() {
+ ensureEmpty();
+ {
+ auto gw = getWriter();
+ auto writer = gw.get();
+ for (int i = 0; i < 1000; ++i) {
+ auto obj = getObj();
+ _added.insert(obj);
+ writer->insertDocument(obj);
+ }
+ }
+
+ auto contents = readCollection();
+ BSONSet found(contents.begin(), contents.end());
+
+ // Check set equality between found and _added.
+ auto i1 = found.begin();
+ for (auto i2 = _added.begin(); i2 != _added.end(); ++i2) {
+ ASSERT_TRUE(i1 != found.end());
+ ASSERT_EQ(SimpleBSONObjComparator::kInstance.compare(*i1, *i2), 0);
+ ++i1;
+ }
+ ASSERT_TRUE(i1 == found.end());
+ }
+
+private:
+ using BSONSet = std::set<BSONObj, BSONObjCompare>;
+ BSONSet _added;
+};
+
+/**
+ * Test that the writer works when a global X lock is held by the caller.
+ */
+class DeferredWriterTestNoDeadlock : public DeferredWriterTestBase {
+public:
+ void run(void) {
+ int nDocs = 1000;
+ ensureEmpty();
+ {
+ auto gw = getWriter();
+ auto writer = gw.get();
+ Lock::GlobalWrite lock(_opCtx.get());
+
+ for (int i = 0; i < nDocs; ++i) {
+ writer->insertDocument(getObj());
+ }
+
+ // Make sure it hasn't added to the collection under our X lock.
+ ASSERT_EQ((size_t)0, readCollection().size());
+ }
+ ASSERT_EQ((size_t)nDocs, readCollection().size());
+ }
+};
+
+/**
+ * Test that the DeferredWriter rejects documents over the buffer size.
+ */
+class DeferredWriterTestCap : public DeferredWriterTestBase {
+public:
+ void run(void) {
+ // Add a few hundred documents.
+ int maxDocs = 500;
+ // (more than can fit in a 2KB buffer).
+ int bufferSize = 2'000;
+
+ // Keep track of what we add.
+ int bytesAdded = 0;
+ int nAdded = 0;
+ bool hitCap = false;
+
+ ensureEmpty();
+ {
+ auto gw = getWriter(CollectionOptions(), bufferSize);
+ auto writer = gw.get();
+ // Don't let it flush the buffer while we're working.
+ Lock::GlobalWrite lock(_opCtx.get());
+ for (int i = 0; i < maxDocs; ++i) {
+ auto obj = getObj();
+
+ if (bytesAdded + obj.objsize() > bufferSize) {
+ // Should return false when we exceed the buffer size.
+ ASSERT(!writer->insertDocument(obj));
+ hitCap = true;
+ } else {
+ ASSERT(writer->insertDocument(obj));
+ bytesAdded += obj.objsize();
+ ++nAdded;
+ }
+ }
+
+ // These documents should definitely exceed the buffer size.
+ ASSERT(hitCap);
+ }
+ // Make sure it didn't add any of the rejected documents.
+ ASSERT_EQ(readCollection().size(), (size_t)nAdded);
+ }
+};
+
+/**
+ * Test that the inserts are sometimes actually executed without flushing.
+ */
+class DeferredWriterTestAsync : public DeferredWriterTestBase {
+public:
+ void worker(DeferredWriter* writer) {
+ for (int i = 0; i < kDocsPerWorker; ++i) {
+ writer->insertDocument(getObj());
+ }
+ }
+
+ void run(void) {
+ using namespace std::chrono_literals;
+ ensureEmpty();
+ ThreadLauncher launcher;
+ auto gw = getWriter();
+ auto writer = gw.get();
+ launcher.launch(kNWorkers, &DeferredWriterTestAsync::worker, this, writer);
+ launcher.await();
+
+ auto start = stdx::chrono::system_clock::now();
+
+ // Spin-wait for one minute or until something has been added to the collection.
+ while (stdx::chrono::system_clock::now() - start < 1min && readCollection().size() == 0) {
+ stdx::this_thread::yield();
+ }
+
+ // Buffer should have flushed by now.
+ ASSERT_GT(readCollection().size(), (size_t)0);
+ }
+
+private:
+ static const int kNWorkers = 20;
+ static const int kDocsPerWorker = 100;
+};
+
+class DeferredWriterTests : public Suite {
+public:
+ DeferredWriterTests() : Suite("deferred_writer_tests") {}
+
+ void setupTests() {
+ add<DeferredWriterTestEmpty>();
+ add<DeferredWriterTestConcurrent>();
+ add<DeferredWriterTestConsistent>();
+ add<DeferredWriterTestNoDeadlock>();
+ add<DeferredWriterTestCap>();
+ add<DeferredWriterTestAsync>();
+ }
+} deferredWriterTests;
+}