summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2017-08-29 13:51:15 -0400
committerEric Milkie <milkie@10gen.com>2017-09-12 16:04:57 -0400
commit6264d36ac6002b296aa41b8dc79400fcc2cbdd74 (patch)
treef1c0971779064bbf6c3fde535b7666d8738f236d /src/mongo
parent978521eb3926867b30903781fd89d4acd931f0c4 (diff)
downloadmongo-6264d36ac6002b296aa41b8dc79400fcc2cbdd74.tar.gz
SERVER-30827 SERVER-30639 Timestamp bulk writes via changes to optime generator
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript8
-rw-r--r--src/mongo/db/catalog/SConscript13
-rw-r--r--src/mongo/db/catalog/collection.h16
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp18
-rw-r--r--src/mongo/db/catalog/collection_impl.h2
-rw-r--r--src/mongo/db/catalog/database_impl.cpp2
-rw-r--r--src/mongo/db/commands/SConscript4
-rw-r--r--src/mongo/db/concurrency/d_concurrency.cpp15
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h4
-rw-r--r--src/mongo/db/concurrency/lock_manager_defs.h18
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp6
-rw-r--r--src/mongo/db/keys_collection_manager_sharding_test.cpp4
-rw-r--r--src/mongo/db/logical_clock_test.cpp2
-rw-r--r--src/mongo/db/ops/SConscript18
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp27
-rw-r--r--src/mongo/db/repair_database.cpp8
-rw-r--r--src/mongo/db/repl/SConscript38
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp32
-rw-r--r--src/mongo/db/repl/initial_syncer.h7
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp38
-rw-r--r--src/mongo/db/repl/oplog.cpp155
-rw-r--r--src/mongo/db/repl/oplog.h40
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/oplogreader.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.h7
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp28
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h9
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp16
30 files changed, 328 insertions, 217 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 204dcfe02fc..4128794d1c1 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -594,12 +594,14 @@ env.Library(
'curop_metrics',
'diag_log',
'lasterror',
- 'write_ops',
'ops/write_ops_parsers',
'rw_concern_d',
's/sharding',
'storage/storage_options',
],
+ LIBDEPS_PRIVATE=[
+ 'ops/write_ops_exec',
+ ],
)
env.Library(
@@ -735,6 +737,9 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/database',
"$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1",
],
+ LIBDEPS_PRIVATE=[
+ "$BUILD_DIR/mongo/db/repl/oplog",
+ ],
)
env.Library(
@@ -1505,7 +1510,6 @@ env.Library(
'ops/update.cpp',
'ops/update_lifecycle_impl.cpp',
'ops/update_result.cpp',
- 'ops/write_ops_exec.cpp',
'ops/write_ops_retryability.cpp',
'session.cpp',
'session_catalog.cpp',
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 1a06fe1ba80..6480a091972 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -229,22 +229,23 @@ env.Library(
'index_create',
'index_key_validate',
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/background',
'$BUILD_DIR/mongo/db/clientcursor',
+ '$BUILD_DIR/mongo/db/collection_index_usage_tracker',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/curop',
+ '$BUILD_DIR/mongo/db/db_raii',
+ '$BUILD_DIR/mongo/db/index/index_access_methods',
'$BUILD_DIR/mongo/db/query/query',
'$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper',
+ '$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/serveronly',
+ '$BUILD_DIR/mongo/db/s/balancer',
'$BUILD_DIR/mongo/db/service_context',
- '$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1',
'$BUILD_DIR/mongo/db/storage/key_string',
+ '$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1',
'$BUILD_DIR/mongo/db/system_index',
'$BUILD_DIR/mongo/db/ttl_collection_cache',
- '$BUILD_DIR/mongo/db/collection_index_usage_tracker',
- '$BUILD_DIR/mongo/db/background',
- '$BUILD_DIR/mongo/db/db_raii',
- '$BUILD_DIR/mongo/db/index/index_access_methods',
- '$BUILD_DIR/mongo/db/s/balancer',
'$BUILD_DIR/mongo/db/views/views_mongod',
],
)
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index df30097f6ca..fd0f9b3b1b4 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -48,6 +48,7 @@
#include "mongo/db/op_observer.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/record_id.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/storage/capped_callback.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/snapshot.h"
@@ -96,21 +97,6 @@ struct CompactStats {
long long corruptDocuments = 0;
};
-struct InsertStatement {
-public:
- InsertStatement() = default;
- explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {}
-
- InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {}
- InsertStatement(StmtId statementId, BSONObj toInsert, SnapshotName ts)
- : stmtId(statementId), timestamp(ts), doc(toInsert) {}
- InsertStatement(BSONObj toInsert, SnapshotName ts) : timestamp(ts), doc(toInsert) {}
-
- StmtId stmtId = kUninitializedStmtId;
- SnapshotName timestamp = SnapshotName();
- BSONObj doc;
-};
-
/**
* Queries with the awaitData option use this notifier object to wait for more data to be
* inserted into the capped collection.
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 5896b3b9f46..d78c3e6f43d 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -170,7 +170,6 @@ CollectionImpl::CollectionImpl(Collection* _this_init,
_cursorManager(_ns),
_cappedNotifier(_recordStore->isCapped() ? stdx::make_unique<CappedInsertNotifier>()
: nullptr),
- _mustTakeCappedLockOnInsert(isCapped() && !_ns.isSystemDotProfile() && !_ns.isOplog()),
_this(_this_init) {}
void CollectionImpl::init(OperationContext* opCtx) {
@@ -305,7 +304,6 @@ Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,
// because it would defeat the purpose of using DocWriter.
invariant(!_validator);
invariant(!_indexCatalog.haveAnyIndexes());
- invariant(!_mustTakeCappedLockOnInsert);
Status status = _recordStore->insertRecordsWithDocWriter(opCtx, docs, timestamps, nDocs);
if (!status.isOK())
@@ -355,9 +353,6 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx,
const SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId();
- if (_mustTakeCappedLockOnInsert)
- synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns);
-
Status status = _insertDocuments(opCtx, begin, end, enforceQuota, opDebug);
if (!status.isOK())
return status;
@@ -407,8 +402,6 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx,
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
- if (_mustTakeCappedLockOnInsert)
- synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns);
// TODO SERVER-30638: using timestamp 0 for these inserts, which are non-oplog so we don't yet
// care about their correct timestamps.
StatusWith<RecordId> loc = _recordStore->insertRecord(
@@ -425,7 +418,14 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx,
}
vector<InsertStatement> inserts;
- inserts.emplace_back(doc);
+ OplogSlot slot;
+ // Fetch a new optime now, if necessary.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isOplogDisabledFor(opCtx, _ns)) {
+ // Populate 'slot' with a new optime.
+ repl::getNextOpTimes(opCtx, 1, &slot);
+ }
+ inserts.emplace_back(kUninitializedStmtId, doc, slot);
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), inserts.begin(), inserts.end(), false);
@@ -468,7 +468,7 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
for (auto it = begin; it != end; it++) {
Record record = {RecordId(), RecordData(it->doc.objdata(), it->doc.objsize())};
records.push_back(record);
- Timestamp timestamp = Timestamp(it->timestamp.asU64());
+ Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp());
timestamps.push_back(timestamp);
}
Status status =
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index 4a66cb1a8bc..f8db017330e 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -464,8 +464,6 @@ private:
// This is non-null if and only if the collection is a capped collection.
const std::shared_ptr<CappedInsertNotifier> _cappedNotifier;
- const bool _mustTakeCappedLockOnInsert;
-
// The earliest snapshot that is allowed to use this collection.
boost::optional<SnapshotName> _minVisibleSnapshot;
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index d57e4352b90..6096e81106f 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -161,7 +161,7 @@ void DatabaseImpl::close(OperationContext* opCtx, const std::string& reason) {
// XXX? - Do we need to close database under global lock or just DB-lock is sufficient ?
invariant(opCtx->lockState()->isW());
- // oplog caches some things, dirty its caches
+ // Clear cache of oplog Collection pointer.
repl::oplogCheckCloseDatabase(opCtx, this->_this);
if (BackgroundOperation::inProgForDb(_name)) {
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 527ae13670c..3efcaa19f17 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -216,6 +216,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ops/write_ops_parsers',
'$BUILD_DIR/mongo/db/pipeline/serveronly',
'$BUILD_DIR/mongo/db/repair_database',
+ '$BUILD_DIR/mongo/db/repl/dbcheck',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_impl',
@@ -232,6 +233,9 @@ env.Library(
'killcursors_common',
'write_commands_common',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ ],
)
env.Library(
diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp
index d5d9fe1684c..0876f5f4f8e 100644
--- a/src/mongo/db/concurrency/d_concurrency.cpp
+++ b/src/mongo/db/concurrency/d_concurrency.cpp
@@ -311,20 +311,9 @@ void Lock::ResourceLock::unlock() {
}
}
-void synchronizeOnCappedInFlightResource(Locker* lockState, const NamespaceString& cappedNs) {
+void synchronizeOnOplogInFlightResource(Locker* lockState) {
dassert(lockState->inAWriteUnitOfWork());
- const ResourceId resource = cappedNs.db() == "local" ? resourceCappedInFlightForLocalDb
- : resourceCappedInFlightForOtherDb;
-
- // It is illegal to acquire the capped in-flight lock for non-local dbs while holding the
- // capped in-flight lock for the local db. (Unless we already hold the otherDb lock since
- // reacquiring a lock in the same mode never blocks.)
- if (resource == resourceCappedInFlightForOtherDb) {
- dassert(!lockState->isLockHeldForMode(resourceCappedInFlightForLocalDb, MODE_IX) ||
- lockState->isLockHeldForMode(resourceCappedInFlightForOtherDb, MODE_IX));
- }
-
- Lock::ResourceLock heldUntilEndOfWUOW{lockState, resource, MODE_IX};
+ Lock::ResourceLock heldUntilEndOfWUOW{lockState, resourceInFlightForOplog, MODE_IX};
}
} // namespace mongo
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 44be439d539..6b0cf4e9547 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -384,8 +384,8 @@ public:
/**
* Takes a lock on resourceCappedInFlight in MODE_IX which will be held until the end of your
- * WUOW. This ensures that a MODE_X lock on this resource will wait for all in-flight capped
+ * WUOW. This ensures that a MODE_X lock on this resource will wait for all in-flight oplog
* inserts to either commit or rollback and block new ones from starting.
*/
-void synchronizeOnCappedInFlightResource(Locker* opCtx, const NamespaceString& cappedNs);
+void synchronizeOnOplogInFlightResource(Locker* opCtx);
}
diff --git a/src/mongo/db/concurrency/lock_manager_defs.h b/src/mongo/db/concurrency/lock_manager_defs.h
index e1b72f7b1df..5951955eeb1 100644
--- a/src/mongo/db/concurrency/lock_manager_defs.h
+++ b/src/mongo/db/concurrency/lock_manager_defs.h
@@ -188,8 +188,7 @@ public:
SINGLETON_PARALLEL_BATCH_WRITER_MODE,
SINGLETON_GLOBAL,
SINGLETON_MMAPV1_FLUSH,
- SINGLETON_CAPPED_IN_FLIGHT_OTHER_DB,
- SINGLETON_CAPPED_IN_FLIGHT_LOCAL_DB,
+ SINGLETON_IN_FLIGHT_OPLOG,
};
ResourceId() : _fullHash(0) {}
@@ -265,17 +264,12 @@ extern const ResourceId resourceIdAdminDB;
// TODO: Merge this with resourceIdGlobal
extern const ResourceId resourceIdParallelBatchWriterMode;
-// Everywhere that starts in-flight capped inserts which allocate capped collection RecordIds in
-// a way that could trigger hiding of newer records takes this lock in MODE_IX and holds it
-// until the end of their WriteUnitOfWork. The localDb resource is for capped collections in the
-// local database (including the oplog). The otherDb resource is for capped collections in any other
-// database.
+// Every place that starts oplog inserts takes this lock in MODE_IX and holds it
+// until the end of their WriteUnitOfWork.
//
-// Threads that need a consistent view of the world can lock both of these in MODE_X to prevent
-// concurrent in-flight capped inserts. The otherDb resource must be acquired before the localDb
-// resource.
-extern const ResourceId resourceCappedInFlightForLocalDb;
-extern const ResourceId resourceCappedInFlightForOtherDb;
+// Threads that need a consistent view of the world can lock this in MODE_X to prevent
+// concurrent in-flight oplog inserts.
+extern const ResourceId resourceInFlightForOplog;
/**
* Interface on which granted lock requests will be notified. See the contract for the notify
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 721067e3aad..5409c1190f6 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -964,9 +964,7 @@ const ResourceId resourceIdOplog = ResourceId(RESOURCE_COLLECTION, StringData("l
const ResourceId resourceIdAdminDB = ResourceId(RESOURCE_DATABASE, StringData("admin"));
const ResourceId resourceIdParallelBatchWriterMode =
ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_PARALLEL_BATCH_WRITER_MODE);
-const ResourceId resourceCappedInFlightForLocalDb =
- ResourceId(RESOURCE_METADATA, ResourceId::SINGLETON_CAPPED_IN_FLIGHT_LOCAL_DB);
-const ResourceId resourceCappedInFlightForOtherDb =
- ResourceId(RESOURCE_METADATA, ResourceId::SINGLETON_CAPPED_IN_FLIGHT_OTHER_DB);
+const ResourceId resourceInFlightForOplog =
+ ResourceId(RESOURCE_METADATA, ResourceId::SINGLETON_IN_FLIGHT_OPLOG);
} // namespace mongo
diff --git a/src/mongo/db/keys_collection_manager_sharding_test.cpp b/src/mongo/db/keys_collection_manager_sharding_test.cpp
index d7af13a09fc..d7c88d097f3 100644
--- a/src/mongo/db/keys_collection_manager_sharding_test.cpp
+++ b/src/mongo/db/keys_collection_manager_sharding_test.cpp
@@ -63,6 +63,10 @@ protected:
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(true);
auto clockSource = stdx::make_unique<ClockSourceMock>();
+ // Timestamps of "0 seconds" are not allowed, so we must advance our clock mock to the first
+ // real second.
+ clockSource->advance(Seconds(1));
+
operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource));
auto catalogClient = Grid::get(operationContext())->catalogClient();
_keyManager =
diff --git a/src/mongo/db/logical_clock_test.cpp b/src/mongo/db/logical_clock_test.cpp
index bf3b78a3062..4dd64809a86 100644
--- a/src/mongo/db/logical_clock_test.cpp
+++ b/src/mongo/db/logical_clock_test.cpp
@@ -133,7 +133,7 @@ TEST_F(LogicalClockTest, InitFromTrustedSourceCanAcceptVeryOldLogicalTime) {
// Verify writes to the oplog advance cluster time.
TEST_F(LogicalClockTest, WritesToOplogAdvanceClusterTime) {
- Timestamp tX(1);
+ Timestamp tX(1, 0);
auto initialTime = LogicalTime(tX);
getClock()->setClusterTimeFromTrustedSource(initialTime);
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index 2254606f570..beadfc36549 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -5,6 +5,24 @@ Import("env")
env = env.Clone()
env.Library(
+ target='write_ops_exec',
+ source=[
+ 'write_ops_exec.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/repl/oplog',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/util/fail_point',
+ '$BUILD_DIR/mongo/db/write_ops',
+ '$BUILD_DIR/mongo/db/curop',
+ '$BUILD_DIR/mongo/db/db_raii',
+ ],
+)
+
+
+
+env.Library(
target='update',
source=[
'modifier_add_to_set.cpp',
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 666ca4efcd4..6db044b5d6f 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -298,11 +298,32 @@ WriteResult performCreateIndexes(OperationContext* opCtx, const write_ops::Inser
void insertDocuments(OperationContext* opCtx,
Collection* collection,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end) {
+ std::vector<InsertStatement>::iterator begin,
+ std::vector<InsertStatement>::iterator end) {
// Intentionally not using writeConflictRetry. That is handled by the caller so it can react to
// oversized batches.
WriteUnitOfWork wuow(opCtx);
+
+ // Acquire optimes and fill them in for each item in the batch.
+ // This must only be done for doc-locking storage engines, which are allowed to insert oplog
+ // documents out-of-timestamp-order. For other storage engines, the oplog entries must be
+ // physically written in timestamp order, so we defer optime assignment until the oplog is about
+ // to be written.
+ auto batchSize = std::distance(begin, end);
+ std::unique_ptr<OplogSlot[]> slots(new OplogSlot[batchSize]);
+ if (supportsDocLocking()) {
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->isOplogDisabledFor(opCtx, collection->ns())) {
+ // Populate 'slots' with new optimes for each insert.
+ // This also notifies the storage engine of each new timestamp.
+ repl::getNextOpTimes(opCtx, batchSize, slots.get());
+ OplogSlot* slot = slots.get();
+ for (auto it = begin; it != end; it++) {
+ it->oplogSlot = *slot++;
+ }
+ }
+ }
+
uassertStatusOK(collection->insertDocuments(
opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true));
wuow.commit();
@@ -313,7 +334,7 @@ void insertDocuments(OperationContext* opCtx,
*/
bool insertBatchAndHandleErrors(OperationContext* opCtx,
const write_ops::Insert& wholeOp,
- const std::vector<InsertStatement>& batch,
+ std::vector<InsertStatement>& batch,
LastOpFixer* lastOpFixer,
WriteResult* out) {
if (batch.empty())
diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp
index 552f4577f04..b8feba841c1 100644
--- a/src/mongo/db/repair_database.cpp
+++ b/src/mongo/db/repair_database.cpp
@@ -249,8 +249,11 @@ Status repairDatabase(OperationContext* opCtx,
if (engine->isMmapV1()) {
// MMAPv1 is a layering violation so it implements its own repairDatabase.
- return static_cast<MMAPV1Engine*>(engine)->repairDatabase(
+ auto status = static_cast<MMAPV1Engine*>(engine)->repairDatabase(
opCtx, dbName, preserveClonedFilesOnFailure, backupOriginalFiles);
+ // Restore oplog Collection pointer cache.
+ repl::acquireOplogCollectionForLogging(opCtx);
+ return status;
}
// These are MMAPv1 specific
@@ -278,6 +281,9 @@ Status repairDatabase(OperationContext* opCtx,
for (auto&& collection : *db) {
collection->setMinimumVisibleSnapshot(snapshotName);
}
+
+ // Restore oplog Collection pointer cache.
+ repl::acquireOplogCollectionForLogging(opCtx);
} catch (...) {
severe() << "Unexpected exception encountered while reopening database after repair.";
std::terminate(); // Logs additional info about the specific error.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 4cf37c9f16e..0b1cf62ad90 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -9,25 +9,31 @@ env.Library(
source=[
'apply_ops.cpp',
'oplog.cpp',
- 'oplogreader.cpp',
],
- LIBDEPS=[
- 'dbcheck',
- 'repl_coordinator_interface',
- 'repl_coordinator_global',
+ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/background',
- '$BUILD_DIR/mongo/db/catalog/database',
- '$BUILD_DIR/mongo/db/catalog/database_holder',
- '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/index_d',
- '$BUILD_DIR/mongo/db/index/index_descriptor',
- '$BUILD_DIR/mongo/db/write_ops',
- '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
- '$BUILD_DIR/mongo/db/namespace_string',
+ 'dbcheck',
+ 'repl_coordinator_interface',
+ ],
+)
+
+env.Library(
+ target='oplogreader',
+ source=[
+ 'oplogreader.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/db/auth/authcommon',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
+ '$BUILD_DIR/mongo/util/net/network',
],
)
@@ -37,12 +43,12 @@ env.CppUnitTest(
'oplog_test.cpp',
],
LIBDEPS=[
- 'oplog',
- 'oplog_entry',
- 'replmocks',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
'$BUILD_DIR/mongo/unittest/concurrency',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ 'oplog',
+ 'oplog_entry',
+ 'replmocks',
],
)
@@ -54,6 +60,7 @@ env.Library(
LIBDEPS=[
'oplog',
'oplog_interface_remote',
+ 'oplogreader',
'repl_coordinator_interface',
'repl_coordinator_global',
'$BUILD_DIR/mongo/base',
@@ -475,6 +482,7 @@ env.Library(
'oplog',
'replication_process',
'roll_back_local_operations',
+ '$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/s/sharding',
'$BUILD_DIR/mongo/db/write_ops',
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 2f438ec436c..edd114366a4 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -505,7 +505,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
// There is no need to schedule separate task to create oplog collection since we are already in
// a callback and we are certain there's no existing operation context (required for creating
// collections and dropping user databases) attached to the current thread.
- status = _recreateOplogAndDropReplicatedDatabases();
+ status = _truncateOplogAndDropReplicatedDatabases();
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
@@ -527,9 +527,9 @@ void InitialSyncer::_chooseSyncSourceCallback(
_getBaseRollbackIdHandle = scheduleResult.getValue();
}
-Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() {
- // drop/create oplog; drop user databases.
- LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS
+Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() {
+ // truncate oplog; drop user databases.
+ LOG(1) << "About to truncate the oplog, if it exists, ns:" << _opts.localOplogNS
<< ", and drop all user databases (so that we can clone them).";
auto opCtx = makeOpCtx();
@@ -537,23 +537,21 @@ Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() {
// We are not replicating nor validating these writes.
UnreplicatedWritesBlock unreplicatedWritesBlock(opCtx.get());
- // 1.) Drop the oplog.
- LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS;
- auto status = _storage->dropCollection(opCtx.get(), _opts.localOplogNS);
+ // 1.) Truncate the oplog.
+ LOG(2) << "Truncating the existing oplog: " << _opts.localOplogNS;
+ auto status = _storage->truncateCollection(opCtx.get(), _opts.localOplogNS);
if (!status.isOK()) {
- return status;
+ // 1a.) Create the oplog.
+ LOG(2) << "Creating the oplog: " << _opts.localOplogNS;
+ status = _storage->createOplog(opCtx.get(), _opts.localOplogNS);
+ if (!status.isOK()) {
+ return status;
+ }
}
// 2.) Drop user databases.
- LOG(2) << "Dropping user databases";
- status = _storage->dropReplicatedDatabases(opCtx.get());
- if (!status.isOK()) {
- return status;
- }
-
- // 3.) Create the oplog.
- LOG(2) << "Creating the oplog: " << _opts.localOplogNS;
- return _storage->createOplog(opCtx.get(), _opts.localOplogNS);
+ LOG(2) << "Dropping user databases";
+ return _storage->dropReplicatedDatabases(opCtx.get());
}
void InitialSyncer::_rollbackCheckerResetCallback(
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 8ee406d8876..63095486232 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -278,7 +278,7 @@ private:
* |
* |
* V
- * _recreateOplogAndDropReplicatedDatabases()
+ * _truncateOplogAndDropReplicatedDatabases()
* |
* |
* V
@@ -368,11 +368,10 @@ private:
/**
* This function does the following:
- * 1.) Drop oplog.
+ * 1.) Truncate oplog.
* 2.) Drop user databases (replicated dbs).
- * 3.) Create oplog.
*/
- Status _recreateOplogAndDropReplicatedDatabases();
+ Status _truncateOplogAndDropReplicatedDatabases();
/**
* Callback for rollback checker's first replSetGetRBID command before starting data cloning.
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 274678a10bf..8d765cefc04 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -216,6 +216,7 @@ public:
protected:
struct StorageInterfaceResults {
bool createOplogCalled = false;
+ bool truncateCalled = false;
bool insertedOplogEntries = false;
int oplogEntriesInserted = 0;
bool droppedUserDBs = false;
@@ -235,6 +236,12 @@ protected:
_storageInterfaceWorkDone.createOplogCalled = true;
return Status::OK();
};
+ _storageInterface->truncateCollFn = [this](OperationContext* opCtx,
+ const NamespaceString& nss) {
+ LockGuard lock(_storageInterfaceWorkDoneMutex);
+ _storageInterfaceWorkDone.truncateCalled = true;
+ return Status::OK();
+ };
_storageInterface->insertDocumentFn = [this](
OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
@@ -921,14 +928,13 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer
ASSERT_TRUE(sharedCallbackStateDestroyed);
}
-TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases) {
- // We are not interested in proceeding beyond the oplog creation stage so we inject a failure
- // after setting '_storageInterfaceWorkDone.createOplogCalled' to true.
- auto oldCreateOplogFn = _storageInterface->createOplogFn;
- _storageInterface->createOplogFn = [oldCreateOplogFn](OperationContext* opCtx,
- const NamespaceString& nss) {
- oldCreateOplogFn(opCtx, nss).transitional_ignore();
- return Status(ErrorCodes::OperationFailed, "oplog creation failed");
+TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) {
+ // We are not interested in proceeding beyond the dropUserDB stage so we inject a failure
+ // after setting '_storageInterfaceWorkDone.droppedUserDBs' to true.
+ auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn;
+ _storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) {
+ ASSERT_OK(oldDropUserDBsFn(opCtx));
+ return Status(ErrorCodes::OperationFailed, "drop userdbs failed");
};
auto initialSyncer = &getInitialSyncer();
@@ -941,8 +947,8 @@ TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
+ ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs);
- ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
}
TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) {
@@ -973,13 +979,13 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError)
TEST_F(
InitialSyncerTest,
InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) {
- // The rollback id request is sent immediately after oplog creation. We shut the task executor
- // down before returning from createOplog() to make the scheduleRemoteCommand() call for
+ // The rollback id request is sent immediately after oplog truncation. We shut the task executor
+ // down before returning from truncate() to make the scheduleRemoteCommand() call for
// replSetGetRBID fail.
- auto oldCreateOplogFn = _storageInterface->createOplogFn;
- _storageInterface->createOplogFn = [oldCreateOplogFn, this](OperationContext* opCtx,
- const NamespaceString& nss) {
- auto status = oldCreateOplogFn(opCtx, nss);
+ auto oldTruncateCollFn = _storageInterface->truncateCollFn;
+ _storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto status = oldTruncateCollFn(opCtx, nss);
getExecutor().shutdown();
return status;
};
@@ -994,7 +1000,7 @@ TEST_F(
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied);
LockGuard lock(_storageInterfaceWorkDoneMutex);
- ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled);
+ ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled);
}
TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 4508ef2621f..a3fc12b7adb 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -116,13 +116,17 @@ std::string masterSlaveOplogName = "local.oplog.$main";
MONGO_FP_DECLARE(disableSnapshotting);
namespace {
-// cached copy...so don't rename, drop, etc.!!!
+/**
+ * The `_localOplogCollection` pointer is always valid (or null) because an
+ * operation must take the global exclusive lock to set the pointer to null when
+ * the Collection instance is destroyed. See `oplogCheckCloseDatabase`.
+ */
Collection* _localOplogCollection = nullptr;
PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64());
-// Synchronizes the section where a new Timestamp is generated and when it actually
-// appears in the oplog.
+// Synchronizes the section where a new Timestamp is generated and when it is registered in the
+// storage engine.
stdx::mutex newOpMutex;
stdx::condition_variable newTimestampNotifier;
// Remembers that last timestamp generated for creating new oplog entries or last timestamp of
@@ -137,27 +141,16 @@ void checkOplogInsert(Status result) {
massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK());
}
-struct OplogSlot {
- OpTime opTime;
- int64_t hash = 0;
-};
-
-/**
- * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to
- * reflect that new optime. Returns the new optime and the correct value of the "h" field for
- * the new oplog entry.
- */
-void getNextOpTime(OperationContext* opCtx,
- Collection* oplog,
- ReplicationCoordinator* replCoord,
- ReplicationCoordinator::Mode replicationMode,
- unsigned count,
- OplogSlot* slotsOut) {
- synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns());
+void _getNextOpTimes(OperationContext* opCtx,
+ Collection* oplog,
+ std::size_t count,
+ OplogSlot* slotsOut) {
+ synchronizeOnOplogInFlightResource(opCtx->lockState());
+ auto replCoord = ReplicationCoordinator::get(opCtx);
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
- if (replicationMode == ReplicationCoordinator::modeReplSet &&
+ if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&
replCoord->isV1ElectionProtocol()) {
// Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm.
term = replCoord->getTerm();
@@ -172,8 +165,8 @@ void getNextOpTime(OperationContext* opCtx,
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
- const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet);
- for (unsigned i = 0; i < count; i++) {
+ const bool needHash = (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet);
+ for (std::size_t i = 0; i < count; i++) {
slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term};
if (needHash) {
slotsOut[i].hash = hashGenerator.nextInt64();
@@ -222,11 +215,18 @@ private:
} // namespace
void setOplogCollectionName() {
- if (getGlobalReplicationCoordinator()->getReplicationMode() ==
- ReplicationCoordinator::modeReplSet) {
- _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns();
- } else {
- _oplogCollectionName = masterSlaveOplogName;
+ switch (getGlobalReplicationCoordinator()->getReplicationMode()) {
+ case ReplicationCoordinator::modeReplSet:
+ _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns();
+ break;
+ case ReplicationCoordinator::modeMasterSlave:
+ _oplogCollectionName = masterSlaveOplogName;
+ break;
+ case ReplicationCoordinator::modeNone:
+ // leave empty.
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
}
@@ -275,21 +275,6 @@ void createIndexForApplyOps(OperationContext* opCtx,
namespace {
-Collection* getLocalOplogCollection(OperationContext* opCtx,
- const std::string& oplogCollectionName) {
- if (_localOplogCollection)
- return _localOplogCollection;
-
- AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX);
- _localOplogCollection = autoColl.getCollection();
- massert(13347,
- "the oplog collection " + oplogCollectionName +
- " missing. did you drop it? if so, restart the server",
- _localOplogCollection);
-
- return _localOplogCollection;
-}
-
/**
* Attaches the session information of a write to an oplog entry if it exists.
*/
@@ -392,11 +377,9 @@ void _logOpsInner(OperationContext* opCtx,
Timestamp* timestamps,
size_t nDocs,
Collection* oplogCollection,
- ReplicationCoordinator::Mode replicationMode,
OpTime finalOpTime) {
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
-
- if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet &&
+ auto replCoord = ReplicationCoordinator::get(opCtx);
+ if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&
!replCoord->canAcceptWritesFor(opCtx, nss)) {
severe() << "logOp() but can't accept write to collection " << nss.ns();
fassertFailed(17405);
@@ -431,12 +414,11 @@ OpTime logOp(OperationContext* opCtx,
Lock::DBLock lk(opCtx, NamespaceString::kLocalDb, MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
- auto const oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
- auto const replMode = replCoord->getReplicationMode();
+ auto const oplog = _localOplogCollection;
OplogSlot slot;
WriteUnitOfWork wuow(opCtx);
- getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot);
+ _getNextOpTimes(opCtx, oplog, 1, &slot);
auto writer = _logOpWriter(opCtx,
opstr,
@@ -453,7 +435,7 @@ OpTime logOp(OperationContext* opCtx,
oplogLink);
const DocWriter* basePtr = &writer;
auto timestamp = slot.opTime.getTimestamp();
- _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, replMode, slot.opTime);
+ _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, slot.opTime);
wuow.commit();
return slot.opTime;
}
@@ -476,14 +458,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
const size_t count = end - begin;
std::vector<OplogDocWriter> writers;
writers.reserve(count);
- Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName);
+ Collection* oplog = _localOplogCollection;
Lock::DBLock lk(opCtx, "local", MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
- std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]);
- auto replMode = replCoord->getReplicationMode();
WriteUnitOfWork wuow(opCtx);
- getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get());
auto wallTime = Date_t::now();
OplogLink oplogLink;
@@ -496,39 +475,40 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
}
auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ OpTime lastOpTime;
for (size_t i = 0; i < count; i++) {
- auto insertStatement = begin[i];
+ // Make a mutable copy.
+ auto insertStatementOplogSlot = begin[i].oplogSlot;
+ // Fetch optime now, if not already fetched.
+ if (insertStatementOplogSlot.opTime.isNull()) {
+ _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot);
+ }
writers.emplace_back(_logOpWriter(opCtx,
"i",
nss,
uuid,
- insertStatement.doc,
+ begin[i].doc,
NULL,
fromMigrate,
- slots[i].opTime,
- slots[i].hash,
+ insertStatementOplogSlot.opTime,
+ insertStatementOplogSlot.hash,
wallTime,
sessionInfo,
- insertStatement.stmtId,
+ begin[i].stmtId,
oplogLink));
- oplogLink.prevTs = slots[i].opTime.getTimestamp();
- timestamps[i] = slots[i].opTime.getTimestamp();
+ oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp();
+ timestamps[i] = oplogLink.prevTs;
+ lastOpTime = insertStatementOplogSlot.opTime;
}
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
- _logOpsInner(opCtx,
- nss,
- basePtrs.get(),
- timestamps.get(),
- count,
- oplog,
- replMode,
- slots[count - 1].opTime);
+ invariant(!lastOpTime.isNull());
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
wuow.commit();
- return slots[count - 1].opTime;
+ return lastOpTime;
}
namespace {
@@ -599,7 +579,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
throw AssertionException(13257, ss.str());
}
}
-
+ acquireOplogCollectionForLogging(opCtx);
if (!isReplSet)
initTimestampFromOplog(opCtx, oplogCollectionName);
return;
@@ -619,6 +599,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] {
WriteUnitOfWork uow(opCtx);
invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options));
+ acquireOplogCollectionForLogging(opCtx);
if (!isReplSet)
getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj());
uow.commit();
@@ -627,6 +608,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
storageEngine->flushAllFiles(opCtx, true);
+
log() << "******" << endl;
}
@@ -636,6 +618,14 @@ void createOplog(OperationContext* opCtx) {
createOplog(opCtx, _oplogCollectionName, isReplSet);
}
+void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut) {
+ // The local oplog collection pointer must already be established by this point.
+ // We can't establish it here because that would require locking the local database, which would
+ // be a lock order violation.
+ invariant(_localOplogCollection);
+ _getNextOpTimes(opCtx, _localOplogCollection, count, slotsOut);
+}
+
// -------------------------------------
namespace {
@@ -1317,6 +1307,7 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) {
void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) {
DBDirectClient c(opCtx);
+ static const BSONObj reverseNaturalObj = BSON("$natural" << -1);
BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk);
if (!lastOp.isEmpty()) {
@@ -1328,8 +1319,18 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS)
void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) {
invariant(opCtx->lockState()->isW());
+ if (db->name() == "local") {
+ _localOplogCollection = nullptr;
+ }
+}
+
- _localOplogCollection = nullptr;
+void acquireOplogCollectionForLogging(OperationContext* opCtx) {
+ if (!_oplogCollectionName.empty()) {
+ AutoGetCollection autoColl(opCtx, NamespaceString(_oplogCollectionName), MODE_IX);
+ _localOplogCollection = autoColl.getCollection();
+ fassert(13347, _localOplogCollection);
+ }
}
void signalOplogWaiters() {
@@ -1429,12 +1430,10 @@ void SnapshotThread::run() {
SnapshotName name(0); // assigned real value in block.
{
- // Make sure there are no in-flight capped inserts while we create our snapshot.
+ // Make sure there are no in-flight oplog inserts while we create our snapshot.
// This lock cannot be aquired until all writes holding the resource commit/abort.
- Lock::ResourceLock cappedInsertLockForOtherDb(
- opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X);
- Lock::ResourceLock cappedInsertLockForLocalDb(
- opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X);
+ Lock::ResourceLock cappedInsertLockForOplog(
+ opCtx->lockState(), resourceInFlightForOplog, MODE_X);
// Reserve the name immediately before we take our snapshot. This ensures that all
// names that compare lower must be from points in time visible to this named
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 812efb81310..2198bb86c7e 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -34,10 +34,11 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
-#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/storage/snapshot_name.h"
#include "mongo/stdx/functional.h"
namespace mongo {
@@ -48,6 +49,30 @@ class OperationContext;
class OperationSessionInfo;
class Session;
+struct OplogSlot {
+ OplogSlot() {}
+ OplogSlot(repl::OpTime opTime, std::int64_t hash) : opTime(opTime), hash(hash) {}
+ repl::OpTime opTime;
+ std::int64_t hash = 0;
+};
+
+struct InsertStatement {
+public:
+ InsertStatement() = default;
+ explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {}
+
+ InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {}
+ InsertStatement(StmtId statementId, BSONObj toInsert, OplogSlot os)
+ : stmtId(statementId), oplogSlot(os), doc(toInsert) {}
+ InsertStatement(BSONObj toInsert, SnapshotName ts)
+ : oplogSlot(repl::OpTime(Timestamp(ts.asU64()), repl::OpTime::kUninitializedTerm), 0),
+ doc(toInsert) {}
+
+ StmtId stmtId = kUninitializedStmtId;
+ OplogSlot oplogSlot;
+ BSONObj doc;
+};
+
namespace repl {
class ReplSettings;
@@ -118,10 +143,15 @@ OpTime logOp(OperationContext* opCtx,
StmtId stmtId,
const OplogLink& oplogLink);
-// Flush out the cached pointers to the local database and oplog.
+// Flush out the cached pointer to the oplog.
// Used by the closeDatabase command to ensure we don't cache closed things.
void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db);
+/**
+ * Establish the cached pointer to the local oplog.
+ */
+void acquireOplogCollectionForLogging(OperationContext* opCtx);
+
using IncrementOpsAppliedStatsFn = stdx::function<void()>;
/**
* Take the object field of a BSONObj, the BSONObj, and the namespace of
@@ -184,5 +214,11 @@ void createIndexForApplyOps(OperationContext* opCtx,
const NamespaceString& indexNss,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats);
+/**
+ * Allocates optimes for new entries in the oplog. Returns an array of OplogSlots, which contain
+ * the new optimes along with their terms and newly calculated hash fields.
+ */
+void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 744045df7f3..659e061c554 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -50,8 +50,6 @@ using std::string;
namespace repl {
-const BSONObj reverseNaturalObj = BSON("$natural" << -1);
-
bool replAuthenticate(DBClientBase* conn) {
if (isInternalAuthSet())
return conn->authenticateInternalUser();
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 7307d14e406..92e224a6498 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -37,9 +37,6 @@
namespace mongo {
namespace repl {
-// {"$natural": -1 }
-extern const BSONObj reverseNaturalObj;
-
/**
* Authenticates conn using the server's cluster-membership credentials.
*
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 06b78d50ef5..674ec4d5940 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -224,6 +224,9 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
OperationContext* opCtx, ReplicationCoordinator* replCoord) {
+ // Initialize the cached pointer to the oplog collection, for writing to the oplog.
+ acquireOplogCollectionForLogging(opCtx);
+
LockGuard lk(_threadMutex);
invariant(replCoord);
invariant(!_bgSync);
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 840b40e2c16..309b1715aa0 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -144,11 +144,16 @@ public:
const CollectionOptions& options) = 0;
/**
- * Drops a collection, like the oplog.
+ * Drops a collection.
*/
virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0;
/**
+ * Truncates a collection.
+ */
+ virtual Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) = 0;
+
+ /**
* Renames a collection from the "fromNS" to the "toNS". Fails if the new collection already
* exists.
*/
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index db090542ed6..e9f6b002cf6 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -411,10 +411,32 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const Names
}
WriteUnitOfWork wunit(opCtx);
const auto status = autoDB.getDb()->dropCollectionEvenIfSystem(opCtx, nss);
- if (status.isOK()) {
- wunit.commit();
+ if (!status.isOK()) {
+ return status;
}
- return status;
+ wunit.commit();
+ return Status::OK();
+ });
+}
+
+Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] {
+ AutoGetCollection autoColl(opCtx, nss, MODE_X);
+ auto collectionResult =
+ getCollection(autoColl, nss, "The collection must exist before truncating.");
+ if (!collectionResult.isOK()) {
+ return collectionResult.getStatus();
+ }
+ auto collection = collectionResult.getValue();
+
+ WriteUnitOfWork wunit(opCtx);
+ const auto status = collection->truncate(opCtx);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ return Status::OK();
});
}
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 2b8ba24f022..dfb367f398d 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -84,6 +84,8 @@ public:
Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override;
+ Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override;
+
Status renameCollection(OperationContext* opCtx,
const NamespaceString& fromNS,
const NamespaceString& toNS,
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index a0cd570cf8d..18902950699 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -99,6 +99,8 @@ public:
stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>;
using CreateCollectionFn = stdx::function<Status(
OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options)>;
+ using TruncateCollectionFn =
+ stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>;
using DropCollectionFn =
stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>;
using FindDocumentsFn =
@@ -168,6 +170,10 @@ public:
return dropCollFn(opCtx, nss);
};
+ Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override {
+ return truncateCollFn(opCtx, nss);
+ }
+
Status renameCollection(OperationContext* opCtx,
const NamespaceString& fromNS,
const NamespaceString& toNS,
@@ -292,6 +298,9 @@ public:
[](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) {
return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."};
};
+ TruncateCollectionFn truncateCollFn = [](OperationContext* opCtx, const NamespaceString& nss) {
+ return Status{ErrorCodes::IllegalOperation, "TruncateCollectionFn not implemented."};
+ };
DropCollectionFn dropCollFn = [](OperationContext* opCtx, const NamespaceString& nss) {
return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."};
};
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
index 807cb94716e..0c7dc580452 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
@@ -385,9 +385,15 @@ StatusWith<RecordId> EphemeralForTestRecordStore::extractAndCheckLocForOplog(con
if (!status.isOK())
return status;
- if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first)
- return StatusWith<RecordId>(ErrorCodes::BadValue, "ts not higher than highest");
-
+ if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first) {
+
+ return StatusWith<RecordId>(ErrorCodes::BadValue,
+ str::stream() << "attempted out-of-order oplog insert of "
+ << status.getValue()
+ << " (oplog last insert was "
+ << _data->records.rbegin()->first
+ << " )");
+ }
return status;
}
@@ -432,8 +438,8 @@ Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext*
for (size_t i = 0; i < nDocs; i++) {
const int len = docs[i]->documentSize();
if (_isCapped && len > _cappedMaxSize) {
- // We use dataSize for capped rollover and we don't want to delete everything if we know
- // this won't fit.
+ // We use dataSize for capped rollover and we don't want to delete everything if we
+ // know this won't fit.
return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
}