diff options
author | Eric Milkie <milkie@10gen.com> | 2017-08-29 13:51:15 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2017-09-12 16:04:57 -0400 |
commit | 6264d36ac6002b296aa41b8dc79400fcc2cbdd74 (patch) | |
tree | f1c0971779064bbf6c3fde535b7666d8738f236d /src/mongo | |
parent | 978521eb3926867b30903781fd89d4acd931f0c4 (diff) | |
download | mongo-6264d36ac6002b296aa41b8dc79400fcc2cbdd74.tar.gz |
SERVER-30827 SERVER-30639 Timestamp bulk writes via changes to optime generator
Diffstat (limited to 'src/mongo')
30 files changed, 328 insertions, 217 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 204dcfe02fc..4128794d1c1 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -594,12 +594,14 @@ env.Library( 'curop_metrics', 'diag_log', 'lasterror', - 'write_ops', 'ops/write_ops_parsers', 'rw_concern_d', 's/sharding', 'storage/storage_options', ], + LIBDEPS_PRIVATE=[ + 'ops/write_ops_exec', + ], ) env.Library( @@ -735,6 +737,9 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/database', "$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1", ], + LIBDEPS_PRIVATE=[ + "$BUILD_DIR/mongo/db/repl/oplog", + ], ) env.Library( @@ -1505,7 +1510,6 @@ env.Library( 'ops/update.cpp', 'ops/update_lifecycle_impl.cpp', 'ops/update_result.cpp', - 'ops/write_ops_exec.cpp', 'ops/write_ops_retryability.cpp', 'session.cpp', 'session_catalog.cpp', diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 1a06fe1ba80..6480a091972 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -229,22 +229,23 @@ env.Library( 'index_create', 'index_key_validate', '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/background', '$BUILD_DIR/mongo/db/clientcursor', + '$BUILD_DIR/mongo/db/collection_index_usage_tracker', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/curop', + '$BUILD_DIR/mongo/db/db_raii', + '$BUILD_DIR/mongo/db/index/index_access_methods', '$BUILD_DIR/mongo/db/query/query', '$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper', + '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/serveronly', + '$BUILD_DIR/mongo/db/s/balancer', '$BUILD_DIR/mongo/db/service_context', - '$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1', '$BUILD_DIR/mongo/db/storage/key_string', + '$BUILD_DIR/mongo/db/storage/mmap_v1/storage_mmapv1', '$BUILD_DIR/mongo/db/system_index', '$BUILD_DIR/mongo/db/ttl_collection_cache', - '$BUILD_DIR/mongo/db/collection_index_usage_tracker', - '$BUILD_DIR/mongo/db/background', - '$BUILD_DIR/mongo/db/db_raii', - '$BUILD_DIR/mongo/db/index/index_access_methods', - '$BUILD_DIR/mongo/db/s/balancer', '$BUILD_DIR/mongo/db/views/views_mongod', ], ) diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index df30097f6ca..fd0f9b3b1b4 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -48,6 +48,7 @@ #include "mongo/db/op_observer.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/record_id.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/storage/capped_callback.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/snapshot.h" @@ -96,21 +97,6 @@ struct CompactStats { long long corruptDocuments = 0; }; -struct InsertStatement { -public: - InsertStatement() = default; - explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {} - - InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {} - InsertStatement(StmtId statementId, BSONObj toInsert, SnapshotName ts) - : stmtId(statementId), timestamp(ts), doc(toInsert) {} - InsertStatement(BSONObj toInsert, SnapshotName ts) : timestamp(ts), doc(toInsert) {} - - StmtId stmtId = kUninitializedStmtId; - SnapshotName timestamp = SnapshotName(); - BSONObj doc; -}; - /** * Queries with the awaitData option use this notifier object to wait for more data to be * inserted into the capped collection. diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 5896b3b9f46..d78c3e6f43d 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -170,7 +170,6 @@ CollectionImpl::CollectionImpl(Collection* _this_init, _cursorManager(_ns), _cappedNotifier(_recordStore->isCapped() ? stdx::make_unique<CappedInsertNotifier>() : nullptr), - _mustTakeCappedLockOnInsert(isCapped() && !_ns.isSystemDotProfile() && !_ns.isOplog()), _this(_this_init) {} void CollectionImpl::init(OperationContext* opCtx) { @@ -305,7 +304,6 @@ Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx, // because it would defeat the purpose of using DocWriter. invariant(!_validator); invariant(!_indexCatalog.haveAnyIndexes()); - invariant(!_mustTakeCappedLockOnInsert); Status status = _recordStore->insertRecordsWithDocWriter(opCtx, docs, timestamps, nDocs); if (!status.isOK()) @@ -355,9 +353,6 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx, const SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId(); - if (_mustTakeCappedLockOnInsert) - synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns); - Status status = _insertDocuments(opCtx, begin, end, enforceQuota, opDebug); if (!status.isOK()) return status; @@ -407,8 +402,6 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx, dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - if (_mustTakeCappedLockOnInsert) - synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns); // TODO SERVER-30638: using timestamp 0 for these inserts, which are non-oplog so we don't yet // care about their correct timestamps. StatusWith<RecordId> loc = _recordStore->insertRecord( @@ -425,7 +418,14 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx, } vector<InsertStatement> inserts; - inserts.emplace_back(doc); + OplogSlot slot; + // Fetch a new optime now, if necessary. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isOplogDisabledFor(opCtx, _ns)) { + // Populate 'slot' with a new optime. + repl::getNextOpTimes(opCtx, 1, &slot); + } + inserts.emplace_back(kUninitializedStmtId, doc, slot); getGlobalServiceContext()->getOpObserver()->onInserts( opCtx, ns(), uuid(), inserts.begin(), inserts.end(), false); @@ -468,7 +468,7 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx, for (auto it = begin; it != end; it++) { Record record = {RecordId(), RecordData(it->doc.objdata(), it->doc.objsize())}; records.push_back(record); - Timestamp timestamp = Timestamp(it->timestamp.asU64()); + Timestamp timestamp = Timestamp(it->oplogSlot.opTime.getTimestamp()); timestamps.push_back(timestamp); } Status status = diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 4a66cb1a8bc..f8db017330e 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -464,8 +464,6 @@ private: // This is non-null if and only if the collection is a capped collection. const std::shared_ptr<CappedInsertNotifier> _cappedNotifier; - const bool _mustTakeCappedLockOnInsert; - // The earliest snapshot that is allowed to use this collection. boost::optional<SnapshotName> _minVisibleSnapshot; diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index d57e4352b90..6096e81106f 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -161,7 +161,7 @@ void DatabaseImpl::close(OperationContext* opCtx, const std::string& reason) { // XXX? - Do we need to close database under global lock or just DB-lock is sufficient ? invariant(opCtx->lockState()->isW()); - // oplog caches some things, dirty its caches + // Clear cache of oplog Collection pointer. repl::oplogCheckCloseDatabase(opCtx, this->_this); if (BackgroundOperation::inProgForDb(_name)) { diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 527ae13670c..3efcaa19f17 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -216,6 +216,7 @@ env.Library( '$BUILD_DIR/mongo/db/ops/write_ops_parsers', '$BUILD_DIR/mongo/db/pipeline/serveronly', '$BUILD_DIR/mongo/db/repair_database', + '$BUILD_DIR/mongo/db/repl/dbcheck', '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_impl', @@ -232,6 +233,9 @@ env.Library( 'killcursors_common', 'write_commands_common', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/ops/write_ops_exec', + ], ) env.Library( diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp index d5d9fe1684c..0876f5f4f8e 100644 --- a/src/mongo/db/concurrency/d_concurrency.cpp +++ b/src/mongo/db/concurrency/d_concurrency.cpp @@ -311,20 +311,9 @@ void Lock::ResourceLock::unlock() { } } -void synchronizeOnCappedInFlightResource(Locker* lockState, const NamespaceString& cappedNs) { +void synchronizeOnOplogInFlightResource(Locker* lockState) { dassert(lockState->inAWriteUnitOfWork()); - const ResourceId resource = cappedNs.db() == "local" ? resourceCappedInFlightForLocalDb - : resourceCappedInFlightForOtherDb; - - // It is illegal to acquire the capped in-flight lock for non-local dbs while holding the - // capped in-flight lock for the local db. (Unless we already hold the otherDb lock since - // reacquiring a lock in the same mode never blocks.) - if (resource == resourceCappedInFlightForOtherDb) { - dassert(!lockState->isLockHeldForMode(resourceCappedInFlightForLocalDb, MODE_IX) || - lockState->isLockHeldForMode(resourceCappedInFlightForOtherDb, MODE_IX)); - } - - Lock::ResourceLock heldUntilEndOfWUOW{lockState, resource, MODE_IX}; + Lock::ResourceLock heldUntilEndOfWUOW{lockState, resourceInFlightForOplog, MODE_IX}; } } // namespace mongo diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h index 44be439d539..6b0cf4e9547 100644 --- a/src/mongo/db/concurrency/d_concurrency.h +++ b/src/mongo/db/concurrency/d_concurrency.h @@ -384,8 +384,8 @@ public: /** * Takes a lock on resourceCappedInFlight in MODE_IX which will be held until the end of your - * WUOW. This ensures that a MODE_X lock on this resource will wait for all in-flight capped + * WUOW. This ensures that a MODE_X lock on this resource will wait for all in-flight oplog * inserts to either commit or rollback and block new ones from starting. */ -void synchronizeOnCappedInFlightResource(Locker* opCtx, const NamespaceString& cappedNs); +void synchronizeOnOplogInFlightResource(Locker* opCtx); } diff --git a/src/mongo/db/concurrency/lock_manager_defs.h b/src/mongo/db/concurrency/lock_manager_defs.h index e1b72f7b1df..5951955eeb1 100644 --- a/src/mongo/db/concurrency/lock_manager_defs.h +++ b/src/mongo/db/concurrency/lock_manager_defs.h @@ -188,8 +188,7 @@ public: SINGLETON_PARALLEL_BATCH_WRITER_MODE, SINGLETON_GLOBAL, SINGLETON_MMAPV1_FLUSH, - SINGLETON_CAPPED_IN_FLIGHT_OTHER_DB, - SINGLETON_CAPPED_IN_FLIGHT_LOCAL_DB, + SINGLETON_IN_FLIGHT_OPLOG, }; ResourceId() : _fullHash(0) {} @@ -265,17 +264,12 @@ extern const ResourceId resourceIdAdminDB; // TODO: Merge this with resourceIdGlobal extern const ResourceId resourceIdParallelBatchWriterMode; -// Everywhere that starts in-flight capped inserts which allocate capped collection RecordIds in -// a way that could trigger hiding of newer records takes this lock in MODE_IX and holds it -// until the end of their WriteUnitOfWork. The localDb resource is for capped collections in the -// local database (including the oplog). The otherDb resource is for capped collections in any other -// database. +// Every place that starts oplog inserts takes this lock in MODE_IX and holds it +// until the end of their WriteUnitOfWork. // -// Threads that need a consistent view of the world can lock both of these in MODE_X to prevent -// concurrent in-flight capped inserts. The otherDb resource must be acquired before the localDb -// resource. -extern const ResourceId resourceCappedInFlightForLocalDb; -extern const ResourceId resourceCappedInFlightForOtherDb; +// Threads that need a consistent view of the world can lock this in MODE_X to prevent +// concurrent in-flight oplog inserts. +extern const ResourceId resourceInFlightForOplog; /** * Interface on which granted lock requests will be notified. See the contract for the notify diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 721067e3aad..5409c1190f6 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -964,9 +964,7 @@ const ResourceId resourceIdOplog = ResourceId(RESOURCE_COLLECTION, StringData("l const ResourceId resourceIdAdminDB = ResourceId(RESOURCE_DATABASE, StringData("admin")); const ResourceId resourceIdParallelBatchWriterMode = ResourceId(RESOURCE_GLOBAL, ResourceId::SINGLETON_PARALLEL_BATCH_WRITER_MODE); -const ResourceId resourceCappedInFlightForLocalDb = - ResourceId(RESOURCE_METADATA, ResourceId::SINGLETON_CAPPED_IN_FLIGHT_LOCAL_DB); -const ResourceId resourceCappedInFlightForOtherDb = - ResourceId(RESOURCE_METADATA, ResourceId::SINGLETON_CAPPED_IN_FLIGHT_OTHER_DB); +const ResourceId resourceInFlightForOplog = + ResourceId(RESOURCE_METADATA, ResourceId::SINGLETON_IN_FLIGHT_OPLOG); } // namespace mongo diff --git a/src/mongo/db/keys_collection_manager_sharding_test.cpp b/src/mongo/db/keys_collection_manager_sharding_test.cpp index d7af13a09fc..d7c88d097f3 100644 --- a/src/mongo/db/keys_collection_manager_sharding_test.cpp +++ b/src/mongo/db/keys_collection_manager_sharding_test.cpp @@ -63,6 +63,10 @@ protected: serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(true); auto clockSource = stdx::make_unique<ClockSourceMock>(); + // Timestamps of "0 seconds" are not allowed, so we must advance our clock mock to the first + // real second. + clockSource->advance(Seconds(1)); + operationContext()->getServiceContext()->setFastClockSource(std::move(clockSource)); auto catalogClient = Grid::get(operationContext())->catalogClient(); _keyManager = diff --git a/src/mongo/db/logical_clock_test.cpp b/src/mongo/db/logical_clock_test.cpp index bf3b78a3062..4dd64809a86 100644 --- a/src/mongo/db/logical_clock_test.cpp +++ b/src/mongo/db/logical_clock_test.cpp @@ -133,7 +133,7 @@ TEST_F(LogicalClockTest, InitFromTrustedSourceCanAcceptVeryOldLogicalTime) { // Verify writes to the oplog advance cluster time. TEST_F(LogicalClockTest, WritesToOplogAdvanceClusterTime) { - Timestamp tX(1); + Timestamp tX(1, 0); auto initialTime = LogicalTime(tX); getClock()->setClusterTimeFromTrustedSource(initialTime); diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 2254606f570..beadfc36549 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -5,6 +5,24 @@ Import("env") env = env.Clone() env.Library( + target='write_ops_exec', + source=[ + 'write_ops_exec.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/repl/oplog', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/util/fail_point', + '$BUILD_DIR/mongo/db/write_ops', + '$BUILD_DIR/mongo/db/curop', + '$BUILD_DIR/mongo/db/db_raii', + ], +) + + + +env.Library( target='update', source=[ 'modifier_add_to_set.cpp', diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 666ca4efcd4..6db044b5d6f 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -298,11 +298,32 @@ WriteResult performCreateIndexes(OperationContext* opCtx, const write_ops::Inser void insertDocuments(OperationContext* opCtx, Collection* collection, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end) { + std::vector<InsertStatement>::iterator begin, + std::vector<InsertStatement>::iterator end) { // Intentionally not using writeConflictRetry. That is handled by the caller so it can react to // oversized batches. WriteUnitOfWork wuow(opCtx); + + // Acquire optimes and fill them in for each item in the batch. + // This must only be done for doc-locking storage engines, which are allowed to insert oplog + // documents out-of-timestamp-order. For other storage engines, the oplog entries must be + // physically written in timestamp order, so we defer optime assignment until the oplog is about + // to be written. + auto batchSize = std::distance(begin, end); + std::unique_ptr<OplogSlot[]> slots(new OplogSlot[batchSize]); + if (supportsDocLocking()) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + if (!replCoord->isOplogDisabledFor(opCtx, collection->ns())) { + // Populate 'slots' with new optimes for each insert. + // This also notifies the storage engine of each new timestamp. + repl::getNextOpTimes(opCtx, batchSize, slots.get()); + OplogSlot* slot = slots.get(); + for (auto it = begin; it != end; it++) { + it->oplogSlot = *slot++; + } + } + } + uassertStatusOK(collection->insertDocuments( opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true)); wuow.commit(); @@ -313,7 +334,7 @@ void insertDocuments(OperationContext* opCtx, */ bool insertBatchAndHandleErrors(OperationContext* opCtx, const write_ops::Insert& wholeOp, - const std::vector<InsertStatement>& batch, + std::vector<InsertStatement>& batch, LastOpFixer* lastOpFixer, WriteResult* out) { if (batch.empty()) diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index 552f4577f04..b8feba841c1 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -249,8 +249,11 @@ Status repairDatabase(OperationContext* opCtx, if (engine->isMmapV1()) { // MMAPv1 is a layering violation so it implements its own repairDatabase. - return static_cast<MMAPV1Engine*>(engine)->repairDatabase( + auto status = static_cast<MMAPV1Engine*>(engine)->repairDatabase( opCtx, dbName, preserveClonedFilesOnFailure, backupOriginalFiles); + // Restore oplog Collection pointer cache. + repl::acquireOplogCollectionForLogging(opCtx); + return status; } // These are MMAPv1 specific @@ -278,6 +281,9 @@ Status repairDatabase(OperationContext* opCtx, for (auto&& collection : *db) { collection->setMinimumVisibleSnapshot(snapshotName); } + + // Restore oplog Collection pointer cache. + repl::acquireOplogCollectionForLogging(opCtx); } catch (...) { severe() << "Unexpected exception encountered while reopening database after repair."; std::terminate(); // Logs additional info about the specific error. diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 4cf37c9f16e..0b1cf62ad90 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -9,25 +9,31 @@ env.Library( source=[ 'apply_ops.cpp', 'oplog.cpp', - 'oplogreader.cpp', ], - LIBDEPS=[ - 'dbcheck', - 'repl_coordinator_interface', - 'repl_coordinator_global', + LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/background', - '$BUILD_DIR/mongo/db/catalog/database', - '$BUILD_DIR/mongo/db/catalog/database_holder', - '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/index_d', - '$BUILD_DIR/mongo/db/index/index_descriptor', - '$BUILD_DIR/mongo/db/write_ops', - '$BUILD_DIR/mongo/db/catalog/catalog_helpers', - '$BUILD_DIR/mongo/db/namespace_string', + 'dbcheck', + 'repl_coordinator_interface', + ], +) + +env.Library( + target='oplogreader', + source=[ + 'oplogreader.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/db/auth/authcommon', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', + '$BUILD_DIR/mongo/util/net/network', ], ) @@ -37,12 +43,12 @@ env.CppUnitTest( 'oplog_test.cpp', ], LIBDEPS=[ - 'oplog', - 'oplog_entry', - 'replmocks', '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/unittest/concurrency', '$BUILD_DIR/mongo/util/concurrency/thread_pool', + 'oplog', + 'oplog_entry', + 'replmocks', ], ) @@ -54,6 +60,7 @@ env.Library( LIBDEPS=[ 'oplog', 'oplog_interface_remote', + 'oplogreader', 'repl_coordinator_interface', 'repl_coordinator_global', '$BUILD_DIR/mongo/base', @@ -475,6 +482,7 @@ env.Library( 'oplog', 'replication_process', 'roll_back_local_operations', + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/database_holder', '$BUILD_DIR/mongo/db/s/sharding', '$BUILD_DIR/mongo/db/write_ops', diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 2f438ec436c..edd114366a4 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -505,7 +505,7 @@ void InitialSyncer::_chooseSyncSourceCallback( // There is no need to schedule separate task to create oplog collection since we are already in // a callback and we are certain there's no existing operation context (required for creating // collections and dropping user databases) attached to the current thread. - status = _recreateOplogAndDropReplicatedDatabases(); + status = _truncateOplogAndDropReplicatedDatabases(); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; @@ -527,9 +527,9 @@ void InitialSyncer::_chooseSyncSourceCallback( _getBaseRollbackIdHandle = scheduleResult.getValue(); } -Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() { - // drop/create oplog; drop user databases. - LOG(1) << "About to drop+create the oplog, if it exists, ns:" << _opts.localOplogNS +Status InitialSyncer::_truncateOplogAndDropReplicatedDatabases() { + // truncate oplog; drop user databases. + LOG(1) << "About to truncate the oplog, if it exists, ns:" << _opts.localOplogNS << ", and drop all user databases (so that we can clone them)."; auto opCtx = makeOpCtx(); @@ -537,23 +537,21 @@ Status InitialSyncer::_recreateOplogAndDropReplicatedDatabases() { // We are not replicating nor validating these writes. UnreplicatedWritesBlock unreplicatedWritesBlock(opCtx.get()); - // 1.) Drop the oplog. - LOG(2) << "Dropping the existing oplog: " << _opts.localOplogNS; - auto status = _storage->dropCollection(opCtx.get(), _opts.localOplogNS); + // 1.) Truncate the oplog. + LOG(2) << "Truncating the existing oplog: " << _opts.localOplogNS; + auto status = _storage->truncateCollection(opCtx.get(), _opts.localOplogNS); if (!status.isOK()) { - return status; + // 1a.) Create the oplog. + LOG(2) << "Creating the oplog: " << _opts.localOplogNS; + status = _storage->createOplog(opCtx.get(), _opts.localOplogNS); + if (!status.isOK()) { + return status; + } } // 2.) Drop user databases. - LOG(2) << "Dropping user databases"; - status = _storage->dropReplicatedDatabases(opCtx.get()); - if (!status.isOK()) { - return status; - } - - // 3.) Create the oplog. - LOG(2) << "Creating the oplog: " << _opts.localOplogNS; - return _storage->createOplog(opCtx.get(), _opts.localOplogNS); + LOG(2) << "Dropping user databases"; + return _storage->dropReplicatedDatabases(opCtx.get()); } void InitialSyncer::_rollbackCheckerResetCallback( diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 8ee406d8876..63095486232 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -278,7 +278,7 @@ private: * | * | * V - * _recreateOplogAndDropReplicatedDatabases() + * _truncateOplogAndDropReplicatedDatabases() * | * | * V @@ -368,11 +368,10 @@ private: /** * This function does the following: - * 1.) Drop oplog. + * 1.) Truncate oplog. * 2.) Drop user databases (replicated dbs). - * 3.) Create oplog. */ - Status _recreateOplogAndDropReplicatedDatabases(); + Status _truncateOplogAndDropReplicatedDatabases(); /** * Callback for rollback checker's first replSetGetRBID command before starting data cloning. diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 274678a10bf..8d765cefc04 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -216,6 +216,7 @@ public: protected: struct StorageInterfaceResults { bool createOplogCalled = false; + bool truncateCalled = false; bool insertedOplogEntries = false; int oplogEntriesInserted = 0; bool droppedUserDBs = false; @@ -235,6 +236,12 @@ protected: _storageInterfaceWorkDone.createOplogCalled = true; return Status::OK(); }; + _storageInterface->truncateCollFn = [this](OperationContext* opCtx, + const NamespaceString& nss) { + LockGuard lock(_storageInterfaceWorkDoneMutex); + _storageInterfaceWorkDone.truncateCalled = true; + return Status::OK(); + }; _storageInterface->insertDocumentFn = [this]( OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) { LockGuard lock(_storageInterfaceWorkDoneMutex); @@ -921,14 +928,13 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer ASSERT_TRUE(sharedCallbackStateDestroyed); } -TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases) { - // We are not interested in proceeding beyond the oplog creation stage so we inject a failure - // after setting '_storageInterfaceWorkDone.createOplogCalled' to true. - auto oldCreateOplogFn = _storageInterface->createOplogFn; - _storageInterface->createOplogFn = [oldCreateOplogFn](OperationContext* opCtx, - const NamespaceString& nss) { - oldCreateOplogFn(opCtx, nss).transitional_ignore(); - return Status(ErrorCodes::OperationFailed, "oplog creation failed"); +TEST_F(InitialSyncerTest, InitialSyncerTruncatesOplogAndDropsReplicatedDatabases) { + // We are not interested in proceeding beyond the dropUserDB stage so we inject a failure + // after setting '_storageInterfaceWorkDone.droppedUserDBs' to true. + auto oldDropUserDBsFn = _storageInterface->dropUserDBsFn; + _storageInterface->dropUserDBsFn = [oldDropUserDBsFn](OperationContext* opCtx) { + ASSERT_OK(oldDropUserDBsFn(opCtx)); + return Status(ErrorCodes::OperationFailed, "drop userdbs failed"); }; auto initialSyncer = &getInitialSyncer(); @@ -941,8 +947,8 @@ TEST_F(InitialSyncerTest, InitialSyncerRecreatesOplogAndDropsReplicatedDatabases ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied); LockGuard lock(_storageInterfaceWorkDoneMutex); + ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled); ASSERT_TRUE(_storageInterfaceWorkDone.droppedUserDBs); - ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); } TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) { @@ -973,13 +979,13 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetRollbackIdScheduleError) TEST_F( InitialSyncerTest, InitialSyncerReturnsShutdownInProgressIfSchedulingRollbackCheckerFailedDueToExecutorShutdown) { - // The rollback id request is sent immediately after oplog creation. We shut the task executor - // down before returning from createOplog() to make the scheduleRemoteCommand() call for + // The rollback id request is sent immediately after oplog truncation. We shut the task executor + // down before returning from truncate() to make the scheduleRemoteCommand() call for // replSetGetRBID fail. - auto oldCreateOplogFn = _storageInterface->createOplogFn; - _storageInterface->createOplogFn = [oldCreateOplogFn, this](OperationContext* opCtx, - const NamespaceString& nss) { - auto status = oldCreateOplogFn(opCtx, nss); + auto oldTruncateCollFn = _storageInterface->truncateCollFn; + _storageInterface->truncateCollFn = [oldTruncateCollFn, this](OperationContext* opCtx, + const NamespaceString& nss) { + auto status = oldTruncateCollFn(opCtx, nss); getExecutor().shutdown(); return status; }; @@ -994,7 +1000,7 @@ TEST_F( ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _lastApplied); LockGuard lock(_storageInterfaceWorkDoneMutex); - ASSERT_TRUE(_storageInterfaceWorkDone.createOplogCalled); + ASSERT_TRUE(_storageInterfaceWorkDone.truncateCalled); } TEST_F(InitialSyncerTest, InitialSyncerCancelsRollbackCheckerOnShutdown) { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 4508ef2621f..a3fc12b7adb 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -116,13 +116,17 @@ std::string masterSlaveOplogName = "local.oplog.$main"; MONGO_FP_DECLARE(disableSnapshotting); namespace { -// cached copy...so don't rename, drop, etc.!!! +/** + * The `_localOplogCollection` pointer is always valid (or null) because an + * operation must take the global exclusive lock to set the pointer to null when + * the Collection instance is destroyed. See `oplogCheckCloseDatabase`. + */ Collection* _localOplogCollection = nullptr; PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()); -// Synchronizes the section where a new Timestamp is generated and when it actually -// appears in the oplog. +// Synchronizes the section where a new Timestamp is generated and when it is registered in the +// storage engine. stdx::mutex newOpMutex; stdx::condition_variable newTimestampNotifier; // Remembers that last timestamp generated for creating new oplog entries or last timestamp of @@ -137,27 +141,16 @@ void checkOplogInsert(Status result) { massert(17322, str::stream() << "write to oplog failed: " << result.toString(), result.isOK()); } -struct OplogSlot { - OpTime opTime; - int64_t hash = 0; -}; - -/** - * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to - * reflect that new optime. Returns the new optime and the correct value of the "h" field for - * the new oplog entry. - */ -void getNextOpTime(OperationContext* opCtx, - Collection* oplog, - ReplicationCoordinator* replCoord, - ReplicationCoordinator::Mode replicationMode, - unsigned count, - OplogSlot* slotsOut) { - synchronizeOnCappedInFlightResource(opCtx->lockState(), oplog->ns()); +void _getNextOpTimes(OperationContext* opCtx, + Collection* oplog, + std::size_t count, + OplogSlot* slotsOut) { + synchronizeOnOplogInFlightResource(opCtx->lockState()); + auto replCoord = ReplicationCoordinator::get(opCtx); long long term = OpTime::kUninitializedTerm; // Fetch term out of the newOpMutex. - if (replicationMode == ReplicationCoordinator::modeReplSet && + if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && replCoord->isV1ElectionProtocol()) { // Current term. If we're not a replset of pv=1, it remains kOldProtocolVersionTerm. term = replCoord->getTerm(); @@ -172,8 +165,8 @@ void getNextOpTime(OperationContext* opCtx, fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts)); // Set hash if we're in replset mode, otherwise it remains 0 in master/slave. - const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet); - for (unsigned i = 0; i < count; i++) { + const bool needHash = (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet); + for (std::size_t i = 0; i < count; i++) { slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term}; if (needHash) { slotsOut[i].hash = hashGenerator.nextInt64(); @@ -222,11 +215,18 @@ private: } // namespace void setOplogCollectionName() { - if (getGlobalReplicationCoordinator()->getReplicationMode() == - ReplicationCoordinator::modeReplSet) { - _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); - } else { - _oplogCollectionName = masterSlaveOplogName; + switch (getGlobalReplicationCoordinator()->getReplicationMode()) { + case ReplicationCoordinator::modeReplSet: + _oplogCollectionName = NamespaceString::kRsOplogNamespace.ns(); + break; + case ReplicationCoordinator::modeMasterSlave: + _oplogCollectionName = masterSlaveOplogName; + break; + case ReplicationCoordinator::modeNone: + // leave empty. + break; + default: + MONGO_UNREACHABLE; } } @@ -275,21 +275,6 @@ void createIndexForApplyOps(OperationContext* opCtx, namespace { -Collection* getLocalOplogCollection(OperationContext* opCtx, - const std::string& oplogCollectionName) { - if (_localOplogCollection) - return _localOplogCollection; - - AutoGetCollection autoColl(opCtx, NamespaceString(oplogCollectionName), MODE_IX); - _localOplogCollection = autoColl.getCollection(); - massert(13347, - "the oplog collection " + oplogCollectionName + - " missing. did you drop it? if so, restart the server", - _localOplogCollection); - - return _localOplogCollection; -} - /** * Attaches the session information of a write to an oplog entry if it exists. */ @@ -392,11 +377,9 @@ void _logOpsInner(OperationContext* opCtx, Timestamp* timestamps, size_t nDocs, Collection* oplogCollection, - ReplicationCoordinator::Mode replicationMode, OpTime finalOpTime) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - - if (nss.size() && replicationMode == ReplicationCoordinator::modeReplSet && + auto replCoord = ReplicationCoordinator::get(opCtx); + if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && !replCoord->canAcceptWritesFor(opCtx, nss)) { severe() << "logOp() but can't accept write to collection " << nss.ns(); fassertFailed(17405); @@ -431,12 +414,11 @@ OpTime logOp(OperationContext* opCtx, Lock::DBLock lk(opCtx, NamespaceString::kLocalDb, MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); - auto const oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); - auto const replMode = replCoord->getReplicationMode(); + auto const oplog = _localOplogCollection; OplogSlot slot; WriteUnitOfWork wuow(opCtx); - getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot); + _getNextOpTimes(opCtx, oplog, 1, &slot); auto writer = _logOpWriter(opCtx, opstr, @@ -453,7 +435,7 @@ OpTime logOp(OperationContext* opCtx, oplogLink); const DocWriter* basePtr = &writer; auto timestamp = slot.opTime.getTimestamp(); - _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, replMode, slot.opTime); + _logOpsInner(opCtx, nss, &basePtr, ×tamp, 1, oplog, slot.opTime); wuow.commit(); return slot.opTime; } @@ -476,14 +458,11 @@ repl::OpTime logInsertOps(OperationContext* opCtx, const size_t count = end - begin; std::vector<OplogDocWriter> writers; writers.reserve(count); - Collection* oplog = getLocalOplogCollection(opCtx, _oplogCollectionName); + Collection* oplog = _localOplogCollection; Lock::DBLock lk(opCtx, "local", MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX); - std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]); - auto replMode = replCoord->getReplicationMode(); WriteUnitOfWork wuow(opCtx); - getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get()); auto wallTime = Date_t::now(); OplogLink oplogLink; @@ -496,39 +475,40 @@ repl::OpTime logInsertOps(OperationContext* opCtx, } auto timestamps = stdx::make_unique<Timestamp[]>(count); + OpTime lastOpTime; for (size_t i = 0; i < count; i++) { - auto insertStatement = begin[i]; + // Make a mutable copy. + auto insertStatementOplogSlot = begin[i].oplogSlot; + // Fetch optime now, if not already fetched. + if (insertStatementOplogSlot.opTime.isNull()) { + _getNextOpTimes(opCtx, oplog, 1, &insertStatementOplogSlot); + } writers.emplace_back(_logOpWriter(opCtx, "i", nss, uuid, - insertStatement.doc, + begin[i].doc, NULL, fromMigrate, - slots[i].opTime, - slots[i].hash, + insertStatementOplogSlot.opTime, + insertStatementOplogSlot.hash, wallTime, sessionInfo, - insertStatement.stmtId, + begin[i].stmtId, oplogLink)); - oplogLink.prevTs = slots[i].opTime.getTimestamp(); - timestamps[i] = slots[i].opTime.getTimestamp(); + oplogLink.prevTs = insertStatementOplogSlot.opTime.getTimestamp(); + timestamps[i] = oplogLink.prevTs; + lastOpTime = insertStatementOplogSlot.opTime; } std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]); for (size_t i = 0; i < count; i++) { basePtrs[i] = &writers[i]; } - _logOpsInner(opCtx, - nss, - basePtrs.get(), - timestamps.get(), - count, - oplog, - replMode, - slots[count - 1].opTime); + invariant(!lastOpTime.isNull()); + _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime); wuow.commit(); - return slots[count - 1].opTime; + return lastOpTime; } namespace { @@ -599,7 +579,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName throw AssertionException(13257, ss.str()); } } - + acquireOplogCollectionForLogging(opCtx); if (!isReplSet) initTimestampFromOplog(opCtx, oplogCollectionName); return; @@ -619,6 +599,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName writeConflictRetry(opCtx, "createCollection", oplogCollectionName, [&] { WriteUnitOfWork uow(opCtx); invariant(ctx.db()->createCollection(opCtx, oplogCollectionName, options)); + acquireOplogCollectionForLogging(opCtx); if (!isReplSet) getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, BSONObj()); uow.commit(); @@ -627,6 +608,7 @@ void createOplog(OperationContext* opCtx, const std::string& oplogCollectionName /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); storageEngine->flushAllFiles(opCtx, true); + log() << "******" << endl; } @@ -636,6 +618,14 @@ void createOplog(OperationContext* opCtx) { createOplog(opCtx, _oplogCollectionName, isReplSet); } +void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut) { + // The local oplog collection pointer must already be established by this point. + // We can't establish it here because that would require locking the local database, which would + // be a lock order violation. + invariant(_localOplogCollection); + _getNextOpTimes(opCtx, _localOplogCollection, count, slotsOut); +} + // ------------------------------------- namespace { @@ -1317,6 +1307,7 @@ void setNewTimestamp(ServiceContext* service, const Timestamp& newTime) { void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) { DBDirectClient c(opCtx); + static const BSONObj reverseNaturalObj = BSON("$natural" << -1); BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); if (!lastOp.isEmpty()) { @@ -1328,8 +1319,18 @@ void initTimestampFromOplog(OperationContext* opCtx, const std::string& oplogNS) void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db) { invariant(opCtx->lockState()->isW()); + if (db->name() == "local") { + _localOplogCollection = nullptr; + } +} + - _localOplogCollection = nullptr; +void acquireOplogCollectionForLogging(OperationContext* opCtx) { + if (!_oplogCollectionName.empty()) { + AutoGetCollection autoColl(opCtx, NamespaceString(_oplogCollectionName), MODE_IX); + _localOplogCollection = autoColl.getCollection(); + fassert(13347, _localOplogCollection); + } } void signalOplogWaiters() { @@ -1429,12 +1430,10 @@ void SnapshotThread::run() { SnapshotName name(0); // assigned real value in block. { - // Make sure there are no in-flight capped inserts while we create our snapshot. + // Make sure there are no in-flight oplog inserts while we create our snapshot. // This lock cannot be aquired until all writes holding the resource commit/abort. - Lock::ResourceLock cappedInsertLockForOtherDb( - opCtx->lockState(), resourceCappedInFlightForOtherDb, MODE_X); - Lock::ResourceLock cappedInsertLockForLocalDb( - opCtx->lockState(), resourceCappedInFlightForLocalDb, MODE_X); + Lock::ResourceLock cappedInsertLockForOplog( + opCtx->lockState(), resourceInFlightForOplog, MODE_X); // Reserve the name immediately before we take our snapshot. This ensures that all // names that compare lower must be from points in time visible to this named diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 812efb81310..2198bb86c7e 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -34,10 +34,11 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" -#include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/storage/snapshot_name.h" #include "mongo/stdx/functional.h" namespace mongo { @@ -48,6 +49,30 @@ class OperationContext; class OperationSessionInfo; class Session; +struct OplogSlot { + OplogSlot() {} + OplogSlot(repl::OpTime opTime, std::int64_t hash) : opTime(opTime), hash(hash) {} + repl::OpTime opTime; + std::int64_t hash = 0; +}; + +struct InsertStatement { +public: + InsertStatement() = default; + explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {} + + InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {} + InsertStatement(StmtId statementId, BSONObj toInsert, OplogSlot os) + : stmtId(statementId), oplogSlot(os), doc(toInsert) {} + InsertStatement(BSONObj toInsert, SnapshotName ts) + : oplogSlot(repl::OpTime(Timestamp(ts.asU64()), repl::OpTime::kUninitializedTerm), 0), + doc(toInsert) {} + + StmtId stmtId = kUninitializedStmtId; + OplogSlot oplogSlot; + BSONObj doc; +}; + namespace repl { class ReplSettings; @@ -118,10 +143,15 @@ OpTime logOp(OperationContext* opCtx, StmtId stmtId, const OplogLink& oplogLink); -// Flush out the cached pointers to the local database and oplog. +// Flush out the cached pointer to the oplog. // Used by the closeDatabase command to ensure we don't cache closed things. void oplogCheckCloseDatabase(OperationContext* opCtx, Database* db); +/** + * Establish the cached pointer to the local oplog. + */ +void acquireOplogCollectionForLogging(OperationContext* opCtx); + using IncrementOpsAppliedStatsFn = stdx::function<void()>; /** * Take the object field of a BSONObj, the BSONObj, and the namespace of @@ -184,5 +214,11 @@ void createIndexForApplyOps(OperationContext* opCtx, const NamespaceString& indexNss, IncrementOpsAppliedStatsFn incrementOpsAppliedStats); +/** + * Allocates optimes for new entries in the oplog. Returns an array of OplogSlots, which contain + * the new optimes along with their terms and newly calculated hash fields. + */ +void getNextOpTimes(OperationContext* opCtx, std::size_t count, OplogSlot* slotsOut); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 744045df7f3..659e061c554 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -50,8 +50,6 @@ using std::string; namespace repl { -const BSONObj reverseNaturalObj = BSON("$natural" << -1); - bool replAuthenticate(DBClientBase* conn) { if (isInternalAuthSet()) return conn->authenticateInternalUser(); diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 7307d14e406..92e224a6498 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -37,9 +37,6 @@ namespace mongo { namespace repl { -// {"$natural": -1 } -extern const BSONObj reverseNaturalObj; - /** * Authenticates conn using the server's cluster-membership credentials. * diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 06b78d50ef5..674ec4d5940 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -224,6 +224,9 @@ bool ReplicationCoordinatorExternalStateImpl::isInitialSyncFlagSet(OperationCont void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( OperationContext* opCtx, ReplicationCoordinator* replCoord) { + // Initialize the cached pointer to the oplog collection, for writing to the oplog. + acquireOplogCollectionForLogging(opCtx); + LockGuard lk(_threadMutex); invariant(replCoord); invariant(!_bgSync); diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 840b40e2c16..309b1715aa0 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -144,11 +144,16 @@ public: const CollectionOptions& options) = 0; /** - * Drops a collection, like the oplog. + * Drops a collection. */ virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0; /** + * Truncates a collection. + */ + virtual Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) = 0; + + /** * Renames a collection from the "fromNS" to the "toNS". Fails if the new collection already * exists. */ diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index db090542ed6..e9f6b002cf6 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -411,10 +411,32 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const Names } WriteUnitOfWork wunit(opCtx); const auto status = autoDB.getDb()->dropCollectionEvenIfSystem(opCtx, nss); - if (status.isOK()) { - wunit.commit(); + if (!status.isOK()) { + return status; } - return status; + wunit.commit(); + return Status::OK(); + }); +} + +Status StorageInterfaceImpl::truncateCollection(OperationContext* opCtx, + const NamespaceString& nss) { + return writeConflictRetry(opCtx, "StorageInterfaceImpl::truncateCollection", nss.ns(), [&] { + AutoGetCollection autoColl(opCtx, nss, MODE_X); + auto collectionResult = + getCollection(autoColl, nss, "The collection must exist before truncating."); + if (!collectionResult.isOK()) { + return collectionResult.getStatus(); + } + auto collection = collectionResult.getValue(); + + WriteUnitOfWork wunit(opCtx); + const auto status = collection->truncate(opCtx); + if (!status.isOK()) { + return status; + } + wunit.commit(); + return Status::OK(); }); } diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 2b8ba24f022..dfb367f398d 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -84,6 +84,8 @@ public: Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override; + Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override; + Status renameCollection(OperationContext* opCtx, const NamespaceString& fromNS, const NamespaceString& toNS, diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index a0cd570cf8d..18902950699 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -99,6 +99,8 @@ public: stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>; using CreateCollectionFn = stdx::function<Status( OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options)>; + using TruncateCollectionFn = + stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>; using DropCollectionFn = stdx::function<Status(OperationContext* opCtx, const NamespaceString& nss)>; using FindDocumentsFn = @@ -168,6 +170,10 @@ public: return dropCollFn(opCtx, nss); }; + Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override { + return truncateCollFn(opCtx, nss); + } + Status renameCollection(OperationContext* opCtx, const NamespaceString& fromNS, const NamespaceString& toNS, @@ -292,6 +298,9 @@ public: [](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."}; }; + TruncateCollectionFn truncateCollFn = [](OperationContext* opCtx, const NamespaceString& nss) { + return Status{ErrorCodes::IllegalOperation, "TruncateCollectionFn not implemented."}; + }; DropCollectionFn dropCollFn = [](OperationContext* opCtx, const NamespaceString& nss) { return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."}; }; diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp index 807cb94716e..0c7dc580452 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp @@ -385,9 +385,15 @@ StatusWith<RecordId> EphemeralForTestRecordStore::extractAndCheckLocForOplog(con if (!status.isOK()) return status; - if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first) - return StatusWith<RecordId>(ErrorCodes::BadValue, "ts not higher than highest"); - + if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first) { + + return StatusWith<RecordId>(ErrorCodes::BadValue, + str::stream() << "attempted out-of-order oplog insert of " + << status.getValue() + << " (oplog last insert was " + << _data->records.rbegin()->first + << " )"); + } return status; } @@ -432,8 +438,8 @@ Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext* for (size_t i = 0; i < nDocs; i++) { const int len = docs[i]->documentSize(); if (_isCapped && len > _cappedMaxSize) { - // We use dataSize for capped rollover and we don't want to delete everything if we know - // this won't fit. + // We use dataSize for capped rollover and we don't want to delete everything if we + // know this won't fit. return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); } |