diff options
author | Eric Milkie <milkie@10gen.com> | 2017-08-29 13:51:15 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2017-09-12 16:04:57 -0400 |
commit | 6264d36ac6002b296aa41b8dc79400fcc2cbdd74 (patch) | |
tree | f1c0971779064bbf6c3fde535b7666d8738f236d /src/mongo/db/repl | |
parent | 978521eb3926867b30903781fd89d4acd931f0c4 (diff) | |
download | mongo-6264d36ac6002b296aa41b8dc79400fcc2cbdd74.tar.gz |
SERVER-30827 SERVER-30639 Timestamp bulk writes via changes to optime generator
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 155 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 9 |
13 files changed, 223 insertions, 141 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 4cf37c9f16e..0b1cf62ad90 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -9,25 +9,31 @@ env.Library( source=[ 'apply_ops.cpp', 'oplog.cpp', - 'oplogreader.cpp', ], - LIBDEPS=[ - 'dbcheck', - 'repl_coordinator_interface', - 'repl_coordinator_global', + LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/background', - '$BUILD_DIR/mongo/db/catalog/database', - '$BUILD_DIR/mongo/db/catalog/database_holder', - '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/index_d', - '$BUILD_DIR/mongo/db/index/index_descriptor', - '$BUILD_DIR/mongo/db/write_ops', - '$BUILD_DIR/mongo/db/catalog/catalog_helpers', - '$BUILD_DIR/mongo/db/namespace_string', + 'dbcheck', + 'repl_coordinator_interface', + ], +) + +env.Library( + target='oplogreader', + source=[ + 'oplogreader.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/db/auth/authcommon', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', + '$BUILD_DIR/mongo/util/net/network', ], ) @@ -37,12 +43,12 @@ env.CppUnitTest( 'oplog_test.cpp', ], LIBDEPS=[ - 'oplog', - 'oplog_entry', - 'replmocks', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/unittest/concurrency', '$BUILD_DIR/mongo/util/concurrency/thread_pool', + 'oplog', + 'oplog_entry', + 'replmocks', ], ) @@ -54,6 +60,7 @@ env.Library( LIBDEPS=[ 'oplog', 'oplog_interface_remote', + 'oplogreader', 'repl_coordinator_interface', 'repl_coordinator_global', '$BUILD_DIR/mongo/base', @@ -475,6 +482,7 @@ env.Library( 'oplog', 'replication_process', 'roll_back_local_operations', + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/s/sharding', '$BUILD_DIR/mongo/db/write_ops', diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 2f438ec436c..edd114366a4 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -505,7 +505,7 @@ void InitialSyncer::_chooseSyncSourceCallback( // There is no need to schedule separate task to create oplog collection since we are already in // a callback and we are certain there's no existing operation context (required for creating // collections and dropping user databases) attached to the current thread. - status = _recreateOplogAndDropReplicatedDatabases(); + status = _truncateOplogAndDropReplicatedDatabases(); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -527,9 +527,9 @@ void InitialSyncer::_chooseSyncSourceCallback( _getBaseRollbackIdHandle = scheduleResult.getValue(); } -Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() { - // drop/create oplog; drop user databases. - LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS +Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() { + // truncate oplog; drop user databases. + LOG(1) << "About to truncate the oplog, if it exists, ns:" << _opts.localOplogNS << ", and drop all user databases (so that we can clone them)."; auto opCtx = makeOpCtx(); @@ -537,23 +537,21 @@ Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() { // We are not replicating nor validating these writes. UnreplicatedWritesBlock unreplicatedWritesBlock(opCtx.get()); - // 1.) Drop the oplog. - LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS; - auto status = _storage->dropCollection(opCtx.get(), _opts.localOplogNS); + // 1.) Truncate the oplog. + LOG(2) << "Truncating the existing oplog: " << _opts.localOplogNS; + auto status = _storage->truncateCollection(opCtx.get(), _opts.localOplogNS); if (!status.isOK()) { - return status; + // 1a.) Create the oplog. + LOG(2) << "Creating the oplog: " << _opts.localOplogNS; + status = _storage->createOplog(opCtx.get(), _opts.localOplogNS); + if (!status.isOK()) { + return status; + } } // 2.) Drop user databases. - LOG(2) << "Dropping user databases"; - status = _storage->dropReplicatedDatabases(opCtx.get()); - if (!status.isOK()) { - return status; - } - - // 3.) Create the oplog. - LOG(2) << "Creating the oplog: " << _opts.localOplogNS; - return _storage->createOplog(opCtx.get(), _opts.localOplogNS); + LOG(2) << "Dropping user databases"; + return _storage->dropReplicatedDatabases(opCtx.get()); } void InitialSyncer::_rollbackCheckerResetCallback( diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 8ee406d8876..63095486232 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -278,7 +278,7 @@ private: * | * | * V - * _recreateOplogAndDropReplicatedDatabases() + * _truncateOplogAndDropReplicatedDatabases() * | * | * V @@ -368,11 +368,10 @@ private: /** * This function does the following: - * 1.) Drop oplog. + * 1.) Truncate oplog. * 2.) Drop user databases (replicated dbs). - * 3.) Create oplog. */ - Status _recreateOplogAndDropReplicatedDatabases(); + Status _truncateOplogAndDropReplicatedDatabases(); /** * Callback for rollback checker's first replSetGetRBID command before starting data cloning. diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 274678a10bf..8d765cefc04 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -216,6 +216,7 @@ public: protected: struct StorageInterfaceResults { bool createOplogCalled = false; + bool truncateCalled = false; bool insertedOplogEntries = false; int oplogEntriesInserted = 0; bool droppedUserDBs = false; @@ -235,6 +236,12 @@ protected: _storageInterfaceWorkDone.createOplogCalled = true; return Status::OK(); }; + _storageInterface->truncateCollFn = [this](OperationContext* opCtx, + const NamespaceString& nss) { + LockGuard lock(_storageInterfaceWorkDoneMutex); + _storageInterfaceWorkDone.truncateCalled = true; + return Status::OK(); + }; _storageInterface->insertDocumentFn = [this]( OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) { LockGuard lock(_storageInterfaceWorkDoneMutex); @@ -921,14 +928,13 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer ASSERT_TRUE(sharedCallbackStateDestroyed); } -TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases) { - // We are not interested in proceeding beyond the oplog creation stage so we inject a failure - // after setting '_storageInterfaceWorkDone.createOplogCalled' to true. - auto oldCreateOplogFn = _storageInterface->createOplogFn; - _storageInterface->createOplogFn = [oldCreateOplogFn](OperationContext* opCtx, - const NamespaceString& nss) { - oldCreateOplogFn(opCtx, nss).transitional_ignore(); - return Status(ErrorCodes::OperationFailed, "oplog creation failed"); +TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) { + // We are not interested in proceeding beyond the dropUserDB stage so we inject a failure + // after setting '_storageInterfaceWorkDone.droppedUserDBs' to true. + auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn; + _storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) { + ASSERT_OK(oldDropUserDBsFn(opCtx)); + return Status(ErrorCodes::OperationFailed, "drop userdbs failed"); }; auto initialSyncer = &getInitialSyncer(); @@ -941,8 +947,8 @@ TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); LockGuard lock(_storageInterfaceWorkDoneMutex); + ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled); ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs); - ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); } TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) { @@ -973,13 +979,13 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) TEST_F( InitialSyncerTest, InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) { - // The rollback id request is sent immediately after oplog creation. We shut the task executor - // down before returning from createOplog() to make the scheduleRemoteCommand() call for + // The rollback id request is sent immediately after oplog truncation. We shut the task executor + // down before returning from truncate() to make the scheduleRemoteCommand() call for // replSetGetRBID fail. - auto oldCreateOplogFn = _storageInterface->createOplogFn; - _storageInterface->createOplogFn = [oldCreateOplogFn, this](OperationContext* opCtx, - const NamespaceString& nss) { - auto status = oldCreateOplogFn(opCtx, nss); + auto oldTruncateCollFn = _storageInterface->truncateCollFn; + _storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx, + const NamespaceString& nss) { + auto status = oldTruncateCollFn(opCtx, nss); getExecutor().shutdown(); return status; }; @@ -994,7 +1000,7 @@ TEST_F( ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied); LockGuard lock(_storageInterfaceWorkDoneMutex); - ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); + ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled); } TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 4508ef2621f..a3fc12b7adb 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -116,13 +116,17 @@ std::string masterSlaveOplogName = "local.oplog.$main"; MONGO_FP_DECLARE(disableSnapshotting); namespace { -// cached copy...so don't rename, drop, etc.!!! +/** + * The `_localOplogCollection` pointer is always valid (or null) because an + * operation must take the global exclusive lock to set the pointer to null when + * the Collection instance is destroyed. See `oplogCheckCloseDatabase`. + */ Collection* _localOplogCollection = nullptr; PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()); -// Synchronizes the section where a new Timestamp is generated and when it actually -// appears in the oplog. +// Synchronizes the section where a new Timestamp is generated and when it is registered in the +// storage engine. stdx::mutex newOpMutex; stdx::condition_variable newTimestampNotifier; // Remembers that last timestamp generated for creating new oplog entries or last timestamp of @@ -137,27 +141,16 @@ void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -struct OplogSlot { - OpTime opTime; - int64_t hash = 0; -}; - -/** - * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to - * reflect that new optime. Returns the new optime and the correct value of the "h" field for - * the new oplog entry. - */ -void getNextOpTime(OperationContext* opCtx, - Collection* oplog, - ReplicationCoordinator* replCoord, - ReplicationCoordinator::Mode replicationMode, - unsigned count, - OplogSlot* slotsOut) { - synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns()); +void _getNextOpTimes(OperationContext* opCtx, + Collection* oplog, + std::size_t count, + OplogSlot* slotsOut) { + synchronizeOnOplogInFlightResource(opCtx->lockState()); + auto replCoord = ReplicationCoordinator::get(opCtx); long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. - if (replicationMode == ReplicationCoordinator::modeReplSet && + if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && replCoord->isV1ElectionProtocol()) { // Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm. term = replCoord->getTerm(); @@ -172,8 +165,8 @@ void getNextOpTime(OperationContext* opCtx, fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts)); // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. - const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet); - for (unsigned i = 0; i < count; i++) { + const bool needHash = (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet); + for (std::size_t i = 0; i < count; i++) { slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term}; if (needHash) { slotsOut[i].hash = hashGenerator.nextInt64(); @@ -222,11 +215,18 @@ private: } // namespace void setOplogCollectionName() { - if (getGlobalReplicationCoordinator()->getReplicationMode() == - ReplicationCoordinator::modeReplSet) { - _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); - } else { - _oplogCollectionName = masterSlaveOplogName; + switch (getGlobalReplicationCoordinator()->getReplicationMode()) { + case ReplicationCoordinator::modeReplSet: + _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); + break; + case ReplicationCoordinator::modeMasterSlave: + _oplogCollectionName = masterSlaveOplogName; + break; + case ReplicationCoordinator::modeNone: + // leave empty. + break; + default: + MONGO_UNREACHABLE; } } @@ -275,21 +275,6 @@ void createIndexForApplyOps(OperationContext* opCtx, namespace { -Collection* getLocalOplogCollection(OperationContext* opCtx, - const std::string& oplogCollectionName) { - if (_localOplogCollection) - return _localOplogCollection; - - AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX); - _localOplogCollection = autoColl.getCollection(); - massert(13347, - "the oplog collection " + oplogCollectionName + - " missing. did you drop it? if so, restart the server", - _localOplogCollection); - - return _localOplogCollection; -} - /** * Attaches the session information of a write to an oplog entry if it exists. */ @@ -392,11 +377,9 @@ void _logOpsInner(OperationContext* opCtx, Timestamp* timestamps, size_t nDocs, Collection* oplogCollection, - ReplicationCoordinator::Mode replicationMode, OpTime finalOpTime) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - - if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet && + auto replCoord = ReplicationCoordinator::get(opCtx); + if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && !replCoord->canAcceptWritesFor(opCtx, nss)) { severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); @@ -431,12 +414,11 @@ OpTime logOp(OperationContext* opCtx, Lock::DBLock lk(opCtx, NamespaceString::kLocalDb, MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); - auto const oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); - auto const replMode = replCoord->getReplicationMode(); + auto const oplog = _localOplogCollection; OplogSlot slot; WriteUnitOfWork wuow(opCtx); - getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot); + _getNextOpTimes(opCtx, oplog, 1, &slot); auto writer = _logOpWriter(opCtx, opstr, @@ -453,7 +435,7 @@ OpTime logOp(OperationContext* opCtx, oplogLink); const DocWriter* basePtr = &writer; auto timestamp = slot.opTime.getTimestamp(); - _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, replMode, slot.opTime); + _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, slot.opTime); wuow.commit(); return slot.opTime; } @@ -476,14 +458,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx, const size_t count = end - begin; std::vector<OplogDocWriter> writers; writers.reserve(count); - Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); + Collection* oplog = _localOplogCollection; Lock::DBLock lk(opCtx, "local", MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); - std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]); - auto replMode = replCoord->getReplicationMode(); WriteUnitOfWork wuow(opCtx); - getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get()); auto wallTime = Date_t::now(); OplogLink oplogLink; @@ -496,39 +475,40 @@ repl::OpTime logInsertOps(OperationContext* opCtx, } auto timestamps = stdx::make_unique<Timestamp[]>(count); + OpTime lastOpTime; for (size_t i = 0; i < count; i++) { - auto insertStatement = begin[i]; + // Make a mutable copy. + auto insertStatementOplogSlot = begin[i].oplogSlot; + // Fetch optime now, if not already fetched. + if (insertStatementOplogSlot.opTime.isNull()) { + _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot); + } writers.emplace_back(_logOpWriter(opCtx, "i", nss, uuid, - insertStatement.doc, + begin[i].doc, NULL, fromMigrate, - slots[i].opTime, - slots[i].hash, + insertStatementOplogSlot.opTime, + insertStatementOplogSlot.hash, wallTime, sessionInfo, - insertStatement.stmtId, + begin[i].stmtId, oplogLink)); - oplogLink.prevTs = slots[i].opTime.getTimestamp(); - timestamps[i] = slots[i].opTime.getTimestamp(); + oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp(); + timestamps[i] = oplogLink.prevTs; + lastOpTime = insertStatementOplogSlot.opTime; } std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]); for (size_t i = 0; i < count; i++) { basePtrs[i] = &writers[i]; } - _logOpsInner(opCtx, - nss, - basePtrs.get(), - timestamps.get(), - count, - oplog, - replMode, - slots[count - 1].opTime); + invariant(!lastOpTime.isNull()); + _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime); wuow.commit(); - return slots[count - 1].opTime; + return lastOpTime; } namespace { @@ -599,7 +579,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName throw AssertionException(13257, ss.str()); } } - + acquireOplogCollectionForLogging(opCtx); if (!isReplSet) initTimestampFromOplog(opCtx, oplogCollectionName); return; @@ -619,6 +599,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] { WriteUnitOfWork uow(opCtx); invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options)); + acquireOplogCollectionForLogging(opCtx); if (!isReplSet) getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj()); uow.commit(); @@ -627,6 +608,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); storageEngine->flushAllFiles(opCtx, true); + log() << "******" << endl; } @@ -636,6 +618,14 @@ void createOplog(OperationContext* opCtx) { createOplog(opCtx, _oplogCollectionName, isReplSet); } +void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut) { + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which would + // be a lock order violation. + invariant(_localOplogCollection); + _getNextOpTimes(opCtx, _localOplogCollection, count, slotsOut); +} + // ------------------------------------- namespace { @@ -1317,6 +1307,7 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) { DBDirectClient c(opCtx); + static const BSONObj reverseNaturalObj = BSON("$natural" << -1); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { @@ -1328,8 +1319,18 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) { invariant(opCtx->lockState()->isW()); + if (db->name() == "local") { + _localOplogCollection = nullptr; + } +} + - _localOplogCollection = nullptr; +void acquireOplogCollectionForLogging(OperationContext* opCtx) { + if (!_oplogCollectionName.empty()) { + AutoGetCollection autoColl(opCtx, NamespaceString(_oplogCollectionName), MODE_IX); + _localOplogCollection = autoColl.getCollection(); + fassert(13347, _localOplogCollection); + } } void signalOplogWaiters() { @@ -1429,12 +1430,10 @@ void SnapshotThread::run() { SnapshotName name(0); // assigned real value in block. { - // Make sure there are no in-flight capped inserts while we create our snapshot. + // Make sure there are no in-flight oplog inserts while we create our snapshot. // This lock cannot be aquired until all writes holding the resource commit/abort. - Lock::ResourceLock cappedInsertLockForOtherDb( - opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X); - Lock::ResourceLock cappedInsertLockForLocalDb( - opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X); + Lock::ResourceLock cappedInsertLockForOplog( + opCtx->lockState(), resourceInFlightForOplog, MODE_X); // Reserve the name immediately before we take our snapshot. This ensures that all // names that compare lower must be from points in time visible to this named diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 812efb81310..2198bb86c7e 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -34,10 +34,11 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" -#include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/storage/snapshot_name.h" #include "mongo/stdx/functional.h" namespace mongo { @@ -48,6 +49,30 @@ class OperationContext; class OperationSessionInfo; class Session; +struct OplogSlot { + OplogSlot() {} + OplogSlot(repl::OpTime opTime, std::int64_t hash) : opTime(opTime), hash(hash) {} + repl::OpTime opTime; + std::int64_t hash = 0; +}; + +struct InsertStatement { +public: + InsertStatement() = default; + explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {} + + InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {} + InsertStatement(StmtId statementId, BSONObj toInsert, OplogSlot os) + : stmtId(statementId), oplogSlot(os), doc(toInsert) {} + InsertStatement(BSONObj toInsert, SnapshotName ts) + : oplogSlot(repl::OpTime(Timestamp(ts.asU64()), repl::OpTime::kUninitializedTerm), 0), + doc(toInsert) {} + + StmtId stmtId = kUninitializedStmtId; + OplogSlot oplogSlot; + BSONObj doc; +}; + namespace repl { class ReplSettings; @@ -118,10 +143,15 @@ OpTime logOp(OperationContext* opCtx, StmtId stmtId, const OplogLink& oplogLink); -// Flush out the cached pointers to the local database and oplog. +// Flush out the cached pointer to the oplog. // Used by the closeDatabase command to ensure we don't cache closed things. void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db); +/** + * Establish the cached pointer to the local oplog. + */ +void acquireOplogCollectionForLogging(OperationContext* opCtx); + using IncrementOpsAppliedStatsFn = stdx::function<void()>; /** * Take the object field of a BSONObj, the BSONObj, and the namespace of @@ -184,5 +214,11 @@ void createIndexForApplyOps(OperationContext* opCtx, const NamespaceString& indexNss, IncrementOpsAppliedStatsFn incrementOpsAppliedStats); +/** + * Allocates optimes for new entries in the oplog. Returns an array of OplogSlots, which contain + * the new optimes along with their terms and newly calculated hash fields. + */ +void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 744045df7f3..659e061c554 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -50,8 +50,6 @@ using std::string; namespace repl { -const BSONObj reverseNaturalObj = BSON("$natural" << -1); - bool replAuthenticate(DBClientBase* conn) { if (isInternalAuthSet()) return conn->authenticateInternalUser(); diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 7307d14e406..92e224a6498 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -37,9 +37,6 @@ namespace mongo { namespace repl { -// {"$natural": -1 } -extern const BSONObj reverseNaturalObj; - /** * Authenticates conn using the server's cluster-membership credentials. * diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 06b78d50ef5..674ec4d5940 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -224,6 +224,9 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( OperationContext* opCtx, ReplicationCoordinator* replCoord) { + // Initialize the cached pointer to the oplog collection, for writing to the oplog. + acquireOplogCollectionForLogging(opCtx); + LockGuard lk(_threadMutex); invariant(replCoord); invariant(!_bgSync); diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 840b40e2c16..309b1715aa0 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -144,11 +144,16 @@ public: const CollectionOptions& options) = 0; /** - * Drops a collection, like the oplog. + * Drops a collection. */ virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0; /** + * Truncates a collection. + */ + virtual Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) = 0; + + /** * Renames a collection from the "fromNS" to the "toNS". Fails if the new collection already * exists. */ diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index db090542ed6..e9f6b002cf6 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -411,10 +411,32 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const Names } WriteUnitOfWork wunit(opCtx); const auto status = autoDB.getDb()->dropCollectionEvenIfSystem(opCtx, nss); - if (status.isOK()) { - wunit.commit(); + if (!status.isOK()) { + return status; } - return status; + wunit.commit(); + return Status::OK(); + }); +} + +Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx, + const NamespaceString& nss) { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] { + AutoGetCollection autoColl(opCtx, nss, MODE_X); + auto collectionResult = + getCollection(autoColl, nss, "The collection must exist before truncating."); + if (!collectionResult.isOK()) { + return collectionResult.getStatus(); + } + auto collection = collectionResult.getValue(); + + WriteUnitOfWork wunit(opCtx); + const auto status = collection->truncate(opCtx); + if (!status.isOK()) { + return status; + } + wunit.commit(); + return Status::OK(); }); } diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 2b8ba24f022..dfb367f398d 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -84,6 +84,8 @@ public: Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override; + Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override; + Status renameCollection(OperationContext* opCtx, const NamespaceString& fromNS, const NamespaceString& toNS, diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index a0cd570cf8d..18902950699 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -99,6 +99,8 @@ public: stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>; using CreateCollectionFn = stdx::function<Status( OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options)>; + using TruncateCollectionFn = + stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>; using DropCollectionFn = stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>; using FindDocumentsFn = @@ -168,6 +170,10 @@ public: return dropCollFn(opCtx, nss); }; + Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override { + return truncateCollFn(opCtx, nss); + } + Status renameCollection(OperationContext* opCtx, const NamespaceString& fromNS, const NamespaceString& toNS, @@ -292,6 +298,9 @@ public: [](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."}; }; + TruncateCollectionFn truncateCollFn = [](OperationContext* opCtx, const NamespaceString& nss) { + return Status{ErrorCodes::IllegalOperation, "TruncateCollectionFn not implemented."}; + }; DropCollectionFn dropCollFn = [](OperationContext* opCtx, const NamespaceString& nss) { return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."}; }; |