diff options
author | Judah Schvimer <judah@mongodb.com> | 2018-04-24 16:37:18 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2018-05-03 12:01:09 -0400 |
commit | dd45579fef2bf4e212161d1e3d97666a80b08bbf (patch) | |
tree | b12df773845f5994240d8c50b0e171f277beb12a /src/mongo/db | |
parent | b1a7cffd3f151efe4d47864c89cf5c7cf9e884cc (diff) | |
download | mongo-dd45579fef2bf4e212161d1e3d97666a80b08bbf.tar.gz |
SERVER-34580 Plumb commit time to commit handlers when available
Diffstat (limited to 'src/mongo/db')
44 files changed, 448 insertions, 167 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f33b74d7625..fdb33e83840 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -869,13 +869,13 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/database', '$BUILD_DIR/mongo/db/storage/mmap_v1/repair_database_interface', 'background', + 'logical_clock', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/document_validation', '$BUILD_DIR/mongo/db/catalog/index_create', '$BUILD_DIR/mongo/db/catalog/index_key_validate', '$BUILD_DIR/mongo/db/repl/oplog', - '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', ], ) @@ -1542,6 +1542,7 @@ env.Library( 'logical_clock.cpp', ], LIBDEPS=[ + 'global_settings', 'logical_time', 'server_parameters', 'service_context', diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp index 56cd2e2969f..2a35c3a2ffe 100644 --- a/src/mongo/db/auth/authz_manager_external_state_local.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp @@ -539,7 +539,7 @@ public: _isO2Set(o2 ? true : false), _o2(_isO2Set ? o2->getOwned() : BSONObj()) {} - virtual void commit() { + virtual void commit(boost::optional<Timestamp>) { stdx::lock_guard<stdx::mutex> lk(_externalState->_roleGraphMutex); Status status = _externalState->_roleGraph.handleLogOp( _opCtx, _op.c_str(), _nss, _o, _isO2Set ? &_o2 : NULL); diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 0c8eaa0e7d9..253abd2f73e 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -328,7 +328,8 @@ Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx, if (!status.isOK()) return status; - opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); + opCtx->recoveryUnit()->onCommit( + [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); }); return status; } @@ -380,7 +381,8 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx, getGlobalServiceContext()->getOpObserver()->onInserts( opCtx, ns(), uuid(), begin, end, fromMigrate); - opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); + opCtx->recoveryUnit()->onCommit( + [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); }); return Status::OK(); } @@ -449,7 +451,8 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx, getGlobalServiceContext()->getOpObserver()->onInserts( opCtx, ns(), uuid(), inserts.begin(), inserts.end(), false); - opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); + opCtx->recoveryUnit()->onCommit( + [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); }); return loc.getStatus(); } diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 4d195d3cf58..fea411a7177 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -111,16 +111,16 @@ public: AddCollectionChange(OperationContext* opCtx, DatabaseImpl* db, StringData ns) : _opCtx(opCtx), _db(db), _ns(ns.toString()) {} - virtual void commit() { + virtual void commit(boost::optional<Timestamp> commitTime) { CollectionMap::const_iterator it = _db->_collections.find(_ns); if (it == _db->_collections.end()) return; // Ban reading from this collection on committed reads on snapshots before now. - auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - auto snapshotName = replCoord->getMinimumVisibleSnapshot(_opCtx); - it->second->setMinimumVisibleSnapshot(snapshotName); + if (commitTime) { + it->second->setMinimumVisibleSnapshot(commitTime.get()); + } } virtual void rollback() { @@ -143,7 +143,7 @@ public: // Takes ownership of coll (but not db). RemoveCollectionChange(DatabaseImpl* db, Collection* coll) : _db(db), _coll(coll) {} - virtual void commit() { + virtual void commit(boost::optional<Timestamp>) { delete _coll; } diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.cpp b/src/mongo/db/catalog/index_catalog_entry_impl.cpp index 1ef395b85b4..7c5f71a1b7f 100644 --- a/src/mongo/db/catalog/index_catalog_entry_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_entry_impl.cpp @@ -184,7 +184,7 @@ class IndexCatalogEntryImpl::SetHeadChange : public RecoveryUnit::Change { public: SetHeadChange(IndexCatalogEntryImpl* ice, RecordId oldHead) : _ice(ice), _oldHead(oldHead) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { _ice->_head = _oldHead; } @@ -273,22 +273,23 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx, // When the recovery unit commits, update the multikey paths if needed and clear the plan cache // if the index metadata has changed. - opCtx->recoveryUnit()->onCommit([this, multikeyPaths, indexMetadataHasChanged] { - _isMultikey.store(true); - - if (_indexTracksPathLevelMultikeyInfo) { - stdx::lock_guard<stdx::mutex> lk(_indexMultikeyPathsMutex); - for (size_t i = 0; i < multikeyPaths.size(); ++i) { - _indexMultikeyPaths[i].insert(multikeyPaths[i].begin(), multikeyPaths[i].end()); + opCtx->recoveryUnit()->onCommit( + [this, multikeyPaths, indexMetadataHasChanged](boost::optional<Timestamp>) { + _isMultikey.store(true); + + if (_indexTracksPathLevelMultikeyInfo) { + stdx::lock_guard<stdx::mutex> lk(_indexMultikeyPathsMutex); + for (size_t i = 0; i < multikeyPaths.size(); ++i) { + _indexMultikeyPaths[i].insert(multikeyPaths[i].begin(), multikeyPaths[i].end()); + } } - } - if (indexMetadataHasChanged && _infoCache) { - LOG(1) << _ns << ": clearing plan cache - index " << _descriptor->keyPattern() - << " set to multi key."; - _infoCache->clearQueryCache(); - } - }); + if (indexMetadataHasChanged && _infoCache) { + LOG(1) << _ns << ": clearing plan cache - index " << _descriptor->keyPattern() + << " set to multi key."; + _infoCache->clearQueryCache(); + } + }); } // ---- diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 65cfa0dbb1c..572b54e4a05 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -54,6 +54,7 @@ #include "mongo/db/index_names.h" #include "mongo/db/jsobj.h" #include "mongo/db/keypattern.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/delete.h" @@ -61,7 +62,6 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" @@ -425,12 +425,13 @@ Status IndexCatalogImpl::IndexBuildBlock::init() { if (isBackgroundIndex) { _opCtx->recoveryUnit()->onCommit( - [ opCtx = _opCtx, entry = _entry, collection = _collection ] { + [ opCtx = _opCtx, entry = _entry, collection = _collection ]( + boost::optional<Timestamp> commitTime) { // This will prevent the unfinished index from being visible on index iterators. - auto minVisible = - repl::ReplicationCoordinator::get(opCtx)->getMinimumVisibleSnapshot(opCtx); - entry->setMinimumVisibleSnapshot(minVisible); - collection->setMinimumVisibleSnapshot(minVisible); + if (commitTime) { + entry->setMinimumVisibleSnapshot(commitTime.get()); + collection->setMinimumVisibleSnapshot(commitTime.get()); + } }); } @@ -481,19 +482,24 @@ void IndexCatalogImpl::IndexBuildBlock::success() { OperationContext* opCtx = _opCtx; LOG(2) << "marking index " << _indexName << " as ready in snapshot id " << opCtx->recoveryUnit()->getSnapshotId(); - _opCtx->recoveryUnit()->onCommit([opCtx, entry, collection] { - // Note: this runs after the WUOW commits but before we release our X lock on the - // collection. This means that any snapshot created after this must include the full index, - // and no one can try to read this index before we set the visibility. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - auto snapshotName = replCoord->getMinimumVisibleSnapshot(opCtx); - entry->setMinimumVisibleSnapshot(snapshotName); - - // TODO remove this once SERVER-20439 is implemented. It is a stopgap solution for - // SERVER-20260 to make sure that reads with majority readConcern level can see indexes that - // are created with w:majority by making the readers block. - collection->setMinimumVisibleSnapshot(snapshotName); - }); + _opCtx->recoveryUnit()->onCommit( + [opCtx, entry, collection](boost::optional<Timestamp> commitTime) { + // Note: this runs after the WUOW commits but before we release our X lock on the + // collection. This means that any snapshot created after this must include the full + // index, and no one can try to read this index before we set the visibility. + if (!commitTime) { + // The end of background index builds on secondaries does not get a commit + // timestamp. We use the cluster time since it's guaranteed to be greater than the + // time of the index build. It is possible the cluster time could be in the future, + // and we will need to do another write to reach the minimum visible snapshot. + commitTime = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp(); + } + entry->setMinimumVisibleSnapshot(commitTime.get()); + // We must also set the minimum visible snapshot on the collection like during init(). + // This prevents reads in the past from reading inconsistent metadata. We should be + // able to remove this when the catalog is versioned. + collection->setMinimumVisibleSnapshot(commitTime.get()); + }); entry->setIsReady(true); } @@ -983,11 +989,16 @@ public: IndexCatalogEntry* entry) : _opCtx(opCtx), _collection(collection), _entries(entries), _entry(entry) {} - void commit() final { + void commit(boost::optional<Timestamp> commitTime) final { // Ban reading from this collection on committed reads on snapshots before now. - auto replCoord = repl::ReplicationCoordinator::get(_opCtx); - auto snapshotName = replCoord->getMinimumVisibleSnapshot(_opCtx); - _collection->setMinimumVisibleSnapshot(snapshotName); + if (!commitTime) { + // This is called when we refresh the index catalog entry, which does not always have + // a commit timestamp. We use the cluster time since it's guaranteed to be greater than + // the time of the index removal. It is possible the cluster time could be in the + // future, and we will need to do another write to reach the minimum visible snapshot. + commitTime = LogicalClock::getClusterTimeForReplicaSet(_opCtx).asTimestamp(); + } + _collection->setMinimumVisibleSnapshot(commitTime.get()); delete _entry; } diff --git a/src/mongo/db/catalog/index_create_impl.cpp b/src/mongo/db/catalog/index_create_impl.cpp index aa04c84b8e9..2a39729f3db 100644 --- a/src/mongo/db/catalog/index_create_impl.cpp +++ b/src/mongo/db/catalog/index_create_impl.cpp @@ -133,7 +133,7 @@ class MultiIndexBlockImpl::SetNeedToCleanupOnRollback : public RecoveryUnit::Cha public: explicit SetNeedToCleanupOnRollback(MultiIndexBlockImpl* indexer) : _indexer(indexer) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { _indexer->_needToCleanup = true; } @@ -151,7 +151,7 @@ class MultiIndexBlockImpl::CleanupIndexesVectorOnRollback : public RecoveryUnit: public: explicit CleanupIndexesVectorOnRollback(MultiIndexBlockImpl* indexer) : _indexer(indexer) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { _indexer->_indexes.clear(); } diff --git a/src/mongo/db/catalog/uuid_catalog.cpp b/src/mongo/db/catalog/uuid_catalog.cpp index 0a50cadfa71..7b43be4f1b1 100644 --- a/src/mongo/db/catalog/uuid_catalog.cpp +++ b/src/mongo/db/catalog/uuid_catalog.cpp @@ -127,7 +127,7 @@ void UUIDCatalog::onRenameCollection(OperationContext* opCtx, GetNewCollectionFunction getNewCollection, CollectionUUID uuid) { Collection* oldColl = removeUUIDCatalogEntry(uuid); - opCtx->recoveryUnit()->onCommit([this, getNewCollection, uuid] { + opCtx->recoveryUnit()->onCommit([this, getNewCollection, uuid](boost::optional<Timestamp>) { // Reset current UUID entry in case some other operation updates the UUID catalog before the // WUOW is committed. registerUUIDCatalogEntry() is a no-op if there's an existing UUID // entry. diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes.cpp index f6ebf5ec180..8d8fc318eff 100644 --- a/src/mongo/db/commands/drop_indexes.cpp +++ b/src/mongo/db/commands/drop_indexes.cpp @@ -51,8 +51,8 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index_builder.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/op_observer.h" -#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" #include "mongo/util/log.h" @@ -218,9 +218,8 @@ public: // This was also done when dropAllIndexes() committed, but we need to ensure that no one // tries to read in the intermediate state where all indexes are newer than the current // snapshot so are unable to be used. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - auto snapshotName = replCoord->getMinimumVisibleSnapshot(opCtx); - collection->setMinimumVisibleSnapshot(snapshotName); + auto clusterTime = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp(); + collection->setMinimumVisibleSnapshot(clusterTime); result.append("nIndexes", static_cast<int>(swIndexesToRebuild.getValue().size())); result.append("indexes", swIndexesToRebuild.getValue()); diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp index c33031abc8d..c9cacca5f32 100644 --- a/src/mongo/db/commands/feature_compatibility_version.cpp +++ b/src/mongo/db/commands/feature_compatibility_version.cpp @@ -159,7 +159,7 @@ void FeatureCompatibilityVersion::onInsertOrUpdate(OperationContext* opCtx, cons << FeatureCompatibilityVersionParser::toString(newVersion); } - opCtx->recoveryUnit()->onCommit([opCtx, newVersion]() { + opCtx->recoveryUnit()->onCommit([opCtx, newVersion](boost::optional<Timestamp>) { serverGlobalParams.featureCompatibility.setVersion(newVersion); updateMinWireVersion(); diff --git a/src/mongo/db/commands/snapshot_management.cpp b/src/mongo/db/commands/snapshot_management.cpp index 7059f937318..9a9084b8723 100644 --- a/src/mongo/db/commands/snapshot_management.cpp +++ b/src/mongo/db/commands/snapshot_management.cpp @@ -35,6 +35,7 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" @@ -79,8 +80,7 @@ public: Lock::GlobalLock lk(opCtx, MODE_IX); - const auto name = - repl::ReplicationCoordinator::get(opCtx)->getMinimumVisibleSnapshot(opCtx); + auto name = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp(); result.append("name", static_cast<long long>(name.asULL())); return CommandHelpers::appendCommandStatus(result, Status::OK()); diff --git a/src/mongo/db/logical_clock.cpp b/src/mongo/db/logical_clock.cpp index a9b7cd62485..efa2d55831b 100644 --- a/src/mongo/db/logical_clock.cpp +++ b/src/mongo/db/logical_clock.cpp @@ -33,6 +33,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/base/status.h" +#include "mongo/db/global_settings.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -76,6 +77,14 @@ bool lessThanOrEqualToMaxPossibleTime(LogicalTime time, uint64_t nTicks) { } } +LogicalTime LogicalClock::getClusterTimeForReplicaSet(OperationContext* opCtx) { + if (getGlobalReplSettings().usingReplSets()) { + return get(opCtx)->getClusterTime(); + } + + return {}; +} + LogicalClock* LogicalClock::get(ServiceContext* service) { return getLogicalClock(service).get(); } diff --git a/src/mongo/db/logical_clock.h b/src/mongo/db/logical_clock.h index fc662eb78ad..e0f5c135c1c 100644 --- a/src/mongo/db/logical_clock.h +++ b/src/mongo/db/logical_clock.h @@ -52,6 +52,12 @@ public: Seconds(365 * 24 * 60 * 60); // 1 year /** + * Returns the current cluster time if this is a replica set node, otherwise returns a null + * logical time. + */ + static LogicalTime getClusterTimeForReplicaSet(OperationContext* opCtx); + + /** * Creates an instance of LogicalClock. */ LogicalClock(ServiceContext*); diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index 7c106ec1d62..df89ce310c6 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -50,7 +50,7 @@ #include "mongo/db/catalog/namespace_uuid_cache.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/index/index_descriptor.h" -#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/storage/mmap_v1/repair_database_interface.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/util/log.h" @@ -278,11 +278,10 @@ Status repairDatabase(OperationContext* opCtx, // Set the minimum snapshot for all Collections in this db. This ensures that readers // using majority readConcern level can only use the collections after their repaired // versions are in the committed view. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - auto snapshotName = replCoord->getMinimumVisibleSnapshot(opCtx); + auto clusterTime = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp(); for (auto&& collection : *db) { - collection->setMinimumVisibleSnapshot(snapshotName); + collection->setMinimumVisibleSnapshot(clusterTime); } // Restore oplog Collection pointer cache. diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0a314399d77..32ffed83e67 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -391,32 +391,41 @@ void _logOpsInner(OperationContext* opCtx, checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] { - - auto lastAppliedTimestamp = finalOpTime.getTimestamp(); - const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); - if (storageEngine->supportsDocLocking()) { - // If the storage engine supports document level locking, then it is possible for - // oplog writes to commit out of order. In that case, we only want to set our last - // applied optime to the all committed timestamp to ensure that all operations earlier - // than the last applied optime have been storage-committed. We are guaranteed that - // whatever operation occurred at the all committed timestamp occurred during the same - // term as 'finalOpTime'. When a primary enters a new term, it first commits a - // 'new primary' oplog entry in the new term before accepting any new writes. This - // will ensure that the all committed timestamp is in the new term before any client - // writes are committed. - lastAppliedTimestamp = storageEngine->getAllCommittedTimestamp(opCtx); - } + opCtx->recoveryUnit()->onCommit( + [opCtx, replCoord, finalOpTime](boost::optional<Timestamp> commitTime) { + if (commitTime) { + // The `finalOpTime` may be less than the `commitTime` if multiple oplog entries + // are logging within one WriteUnitOfWork. + invariant(finalOpTime.getTimestamp() <= *commitTime, + str::stream() << "Final OpTime: " << finalOpTime.toString() + << ". Commit Time: " + << commitTime->toString()); + } + + auto lastAppliedTimestamp = finalOpTime.getTimestamp(); + const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); + if (storageEngine->supportsDocLocking()) { + // If the storage engine supports document level locking, then it is possible for + // oplog writes to commit out of order. In that case, we only want to set our last + // applied optime to the all committed timestamp to ensure that all operations + // earlier than the last applied optime have been storage-committed. We are + // guaranteed that whatever operation occurred at the all committed timestamp + // occurred during the same term as 'finalOpTime'. When a primary enters a new term, + // it first commits a 'new primary' oplog entry in the new term before accepting any + // new writes. This will ensure that the all committed timestamp is in the new term + // before any client writes are committed. + lastAppliedTimestamp = storageEngine->getAllCommittedTimestamp(opCtx); + } - // Optimes on the primary should always represent consistent database states. - replCoord->setMyLastAppliedOpTimeForward( - OpTime(lastAppliedTimestamp, finalOpTime.getTerm()), - ReplicationCoordinator::DataConsistency::Consistent); + // Optimes on the primary should always represent consistent database states. + replCoord->setMyLastAppliedOpTimeForward( + OpTime(lastAppliedTimestamp, finalOpTime.getTerm()), + ReplicationCoordinator::DataConsistency::Consistent); - // We set the last op on the client to 'finalOpTime', because that contains the timestamp - // of the operation that the client actually performed. - ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); - }); + // We set the last op on the client to 'finalOpTime', because that contains the + // timestamp of the operation that the client actually performed. + ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); + }); } OpTime logOp(OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 7b6b77beb5f..c286c37f71e 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -761,21 +761,6 @@ public: virtual Status updateTerm(OperationContext* opCtx, long long term) = 0; /** - * Returns the minimum visible snapshot for this operation. - * - * This name is guaranteed to compare > all names reserved before and < all names reserved - * after. - * - * This method will not take any locks or attempt to access storage using the passed-in - * OperationContext. It will only be used to return reserved SnapshotNames by each operation so - * callers can correctly wait for the reserved snapshot to be visible. - * - * A null OperationContext can be used in cases where the snapshot to wait for should not be - * adjusted. - */ - virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) = 0; - - /** * Blocks until either the current committed snapshot is at least as high as 'untilSnapshot', * or we are interrupted for any reason, including shutdown or maxTimeMs expiration. * 'opCtx' is used to checkForInterrupt and enforce maxTimeMS. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 34b40057bd8..d0d46498f1f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -3358,27 +3358,6 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock( return EventHandle(); } -Timestamp ReplicationCoordinatorImpl::getMinimumVisibleSnapshot(OperationContext* opCtx) { - Timestamp reservedName; - if (getReplicationMode() == Mode::modeReplSet) { - invariant(opCtx->lockState()->isLocked()); - if (getMemberState().primary() || opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { - // Use the current optime on the node, for primary nodes. Additionally, completion of - // background index builds on secondaries will not have a `commit time` and must also - // use the current optime. - reservedName = LogicalClock::get(getServiceContext())->getClusterTime().asTimestamp(); - } else { - // This function is only called when applying command operations on secondaries. - // We ask the RecoveryUnit what timestamp it will assign to this write. - reservedName = opCtx->recoveryUnit()->getCommitTimestamp(); - } - } else { - // All snapshots are the same for a standalone node. - reservedName = Timestamp(); - } - return reservedName; -} - void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx, const Timestamp& untilSnapshot) { stdx::unique_lock<stdx::mutex> lock(_mutex); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 36a03adda66..9db6442ca68 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -288,8 +288,6 @@ public: virtual Status updateTerm(OperationContext* opCtx, long long term) override; - virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) override; - virtual OpTime getCurrentCommittedSnapshotOpTime() const override; virtual void waitUntilSnapshotCommitted(OperationContext* opCtx, diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 437aeea1862..cc8aca366e2 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -442,10 +442,6 @@ Status ReplicationCoordinatorMock::updateTerm(OperationContext* opCtx, long long return Status::OK(); } -Timestamp ReplicationCoordinatorMock::getMinimumVisibleSnapshot(OperationContext* opCtx) { - return Timestamp(_snapshotNameGenerator.addAndFetch(1)); -} - void ReplicationCoordinatorMock::dropAllSnapshots() {} OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index fd5eb00d58c..973c2afb15b 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -241,8 +241,6 @@ public: virtual Status updateTerm(OperationContext* opCtx, long long term); - virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) override; - virtual void dropAllSnapshots() override; virtual OpTime getCurrentCommittedSnapshotOpTime() const override; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index b94279b35d6..be6aafbe27f 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -146,7 +146,7 @@ public: _opTime(opTime), _prePostImageOpTime(prePostImageOpTime) {} - void commit() override { + void commit(boost::optional<Timestamp>) override { switch (_op) { case 'd': { stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index a7bc335294d..51c9fe954c7 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -69,7 +69,7 @@ public: CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss) : _opCtx(opCtx), _nss(nss) {} - void commit() override { + void commit(boost::optional<Timestamp>) override { invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); @@ -95,7 +95,7 @@ public: ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity) : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {} - void commit() override { + void commit(boost::optional<Timestamp>) override { fassertNoTrace( 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity)); } diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 84021db98aa..7d129b164ed 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -1128,7 +1128,8 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { opCtx->recoveryUnit()->onCommit( - [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] { + [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( + boost::optional<Timestamp>) { RetryableWritesStats::get(getGlobalServiceContext()) ->incrementTransactionsCollectionWriteCount(); diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp index 4488c730431..3d212b09900 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp @@ -486,7 +486,7 @@ private: IndexChange(IndexSet* data, const IndexKeyEntry& entry, bool insert) : _data(data), _entry(entry), _insert(insert) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { if (_insert) _data->erase(_entry); diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp index 5885244528b..f2ed2f99de2 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp @@ -51,7 +51,7 @@ class EphemeralForTestRecordStore::InsertChange : public RecoveryUnit::Change { public: InsertChange(OperationContext* opCtx, Data* data, RecordId loc) : _opCtx(opCtx), _data(data), _loc(loc) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); @@ -77,7 +77,7 @@ public: const EphemeralForTestRecord& rec) : _opCtx(opCtx), _data(data), _loc(loc), _rec(rec) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex); @@ -107,7 +107,7 @@ public: swap(_records, _data->records); } - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { using std::swap; diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp index 97471b21ca2..e213362ffee 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp @@ -40,7 +40,7 @@ namespace mongo { void EphemeralForTestRecoveryUnit::commitUnitOfWork() { try { for (Changes::iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { - (*it)->commit(); + (*it)->commit(boost::none); } _changes.clear(); } catch (...) { diff --git a/src/mongo/db/storage/kv/kv_catalog.cpp b/src/mongo/db/storage/kv/kv_catalog.cpp index 86ff663d7c1..85ed6c7f26b 100644 --- a/src/mongo/db/storage/kv/kv_catalog.cpp +++ b/src/mongo/db/storage/kv/kv_catalog.cpp @@ -139,7 +139,7 @@ public: AddIdentChange(KVCatalog* catalog, StringData ident) : _catalog(catalog), _ident(ident.toString()) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { stdx::lock_guard<stdx::mutex> lk(_catalog->_identsLock); _catalog->_idents.erase(_ident); @@ -154,7 +154,7 @@ public: RemoveIdentChange(KVCatalog* catalog, StringData ident, const Entry& entry) : _catalog(catalog), _ident(ident.toString()), _entry(entry) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { stdx::lock_guard<stdx::mutex> lk(_catalog->_identsLock); _catalog->_idents[_ident] = _entry; diff --git a/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp b/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp index 1171ba8467a..1163f89cf3c 100644 --- a/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp +++ b/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp @@ -55,7 +55,7 @@ public: AddIndexChange(OperationContext* opCtx, KVCollectionCatalogEntry* cce, StringData ident) : _opCtx(opCtx), _cce(cce), _ident(ident.toString()) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { // Intentionally ignoring failure. _cce->_engine->dropIdent(_opCtx, _ident).transitional_ignore(); @@ -72,7 +72,7 @@ public: : _opCtx(opCtx), _cce(cce), _ident(ident.toString()) {} virtual void rollback() {} - virtual void commit() { + virtual void commit(boost::optional<Timestamp>) { // Intentionally ignoring failure here. Since we've removed the metadata pointing to the // index, we should never see it again anyway. _cce->_engine->dropIdent(_opCtx, _ident).transitional_ignore(); diff --git a/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp b/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp index 88f1b7b7e55..49868d7374a 100644 --- a/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp +++ b/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp @@ -58,7 +58,7 @@ public: _ident(ident.toString()), _dropOnRollback(dropOnRollback) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { if (_dropOnRollback) { // Intentionally ignoring failure @@ -94,7 +94,7 @@ public: _entry(entry), _dropOnCommit(dropOnCommit) {} - virtual void commit() { + virtual void commit(boost::optional<Timestamp>) { delete _entry; // Intentionally ignoring failure here. Since we've removed the metadata pointing to the diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index e863e1446b0..7c6fb248668 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -61,7 +61,7 @@ public: RemoveDBChange(KVStorageEngine* engine, StringData db, KVDatabaseCatalogEntryBase* entry) : _engine(engine), _db(db.toString()), _entry(entry) {} - virtual void commit() { + virtual void commit(boost::optional<Timestamp>) { delete _entry; } diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp index 57aba093807..dc0e4aa83e2 100644 --- a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp +++ b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp @@ -107,7 +107,7 @@ class BtreeLogic<BtreeLayout>::Builder::SetRightLeafLocChange : public RecoveryU public: SetRightLeafLocChange(Builder* builder, DiskLoc oldLoc) : _builder(builder), _oldLoc(oldLoc) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { _builder->_rightLeafLoc = _oldLoc; } diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp index 2b235f71961..ff5b114975f 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp @@ -82,7 +82,7 @@ void DurRecoveryUnit::commitChanges() { try { for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { - (*it)->commit(); + (*it)->commit(boost::none); } } catch (...) { std::terminate(); diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index 1575a721f2d..b21b8cffe90 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -196,7 +196,7 @@ public: virtual void abandonSnapshot() {} virtual void registerChange(Change* change) { - change->commit(); + change->commit(boost::none); delete change; } diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp index 31440e91bc5..369681a8298 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp @@ -122,7 +122,7 @@ public: _entry->_removeFromCache(NULL, _ns); } - void commit() {} + void commit(boost::optional<Timestamp>) {} private: const std::string _ns; @@ -145,7 +145,7 @@ public: _catalogEntry->_collections[_ns] = _cachedEntry; } - void commit() { + void commit(boost::optional<Timestamp>) { delete _cachedEntry; } diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp index b89ca31e6b6..9fbe1aaac42 100644 --- a/src/mongo/db/storage/mobile/mobile_record_store.cpp +++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp @@ -681,7 +681,7 @@ class MobileRecordStore::NumRecsChange final : public RecoveryUnit::Change { public: NumRecsChange(MobileRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {} - void commit() override {} + void commit(boost::optional<Timestamp>) override {} void rollback() override { stdx::lock_guard<stdx::mutex> lock(_rs->_numRecsMutex); @@ -718,7 +718,7 @@ class MobileRecordStore::DataSizeChange final : public RecoveryUnit::Change { public: DataSizeChange(MobileRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {} - void commit() override {} + void commit(boost::optional<Timestamp>) override {} void rollback() override { stdx::lock_guard<stdx::mutex> lock(_rs->_dataSizeMutex); diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp index 16a8da49825..2d73e6b2124 100644 --- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp +++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp @@ -53,7 +53,7 @@ void MobileRecoveryUnit::_commit() { for (auto& change : _changes) { try { - change->commit(); + change->commit(boost::none); } catch (...) { std::terminate(); } diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 224e0956ed9..04442aecd8e 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -254,13 +254,17 @@ public: * that rollback() and commit() may be called after resources with a shorter lifetime than * the WriteUnitOfWork have been freed. Each registered change will be committed or rolled * back once. + * + * commit() handlers are passed the timestamp at which the transaction is committed. If the + * transaction is not committed at a particular timestamp, or if the storage engine does not + * support timestamps, then boost::none will be supplied for this parameter. */ class Change { public: virtual ~Change() {} virtual void rollback() = 0; - virtual void commit() = 0; + virtual void commit(boost::optional<Timestamp> commitTime) = 0; }; /** @@ -287,7 +291,7 @@ public: void rollback() final { _callback(); } - void commit() final {} + void commit(boost::optional<Timestamp>) final {} private: Callback _callback; @@ -307,8 +311,8 @@ public: public: OnCommitChange(Callback&& callback) : _callback(std::move(callback)) {} void rollback() final {} - void commit() final { - _callback(); + void commit(boost::optional<Timestamp> commitTime) final { + _callback(commitTime); } private: diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h index 0dbc41de3d5..9713b6aa1ec 100644 --- a/src/mongo/db/storage/recovery_unit_noop.h +++ b/src/mongo/db/storage/recovery_unit_noop.h @@ -43,7 +43,7 @@ public: void commitUnitOfWork() final { for (auto& change : _changes) { try { - change->commit(); + change->commit(boost::none); } catch (...) { std::terminate(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index bbefc5c9b11..78ca842d694 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -105,7 +105,7 @@ public: _highestInserted(highestInserted), _countInserted(countInserted) {} - void commit() final { + void commit(boost::optional<Timestamp>) final { invariant(_bytesInserted >= 0); invariant(_highestInserted.isNormal()); @@ -129,7 +129,7 @@ class WiredTigerRecordStore::OplogStones::TruncateChange final : public Recovery public: TruncateChange(OplogStones* oplogStones) : _oplogStones(oplogStones) {} - void commit() final { + void commit(boost::optional<Timestamp>) final { _oplogStones->_currentRecords.store(0); _oplogStones->_currentBytes.store(0); @@ -1606,7 +1606,7 @@ WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit(OperationContext class WiredTigerRecordStore::NumRecordsChange : public RecoveryUnit::Change { public: NumRecordsChange(WiredTigerRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { _rs->_numRecords.fetchAndAdd(-_diff); } @@ -1629,7 +1629,7 @@ void WiredTigerRecordStore::_changeNumRecords(OperationContext* opCtx, int64_t d class WiredTigerRecordStore::DataSizeChange : public RecoveryUnit::Change { public: DataSizeChange(WiredTigerRecordStore* rs, int64_t amount) : _rs(rs), _amount(amount) {} - virtual void commit() {} + virtual void commit(boost::optional<Timestamp>) {} virtual void rollback() { _rs->_increaseDataSize(NULL, -_amount); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 906eeb7ac6b..2396e17cd45 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -77,6 +77,11 @@ WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() { } void WiredTigerRecoveryUnit::_commit() { + // Since we cannot have both a _lastTimestampSet and a _commitTimestamp, we set the + // commit time as whichever is non-empty. If both are empty, then _lastTimestampSet will + // be boost::none and we'll set the commit time to that. + auto commitTime = _commitTimestamp.isNull() ? _lastTimestampSet : _commitTimestamp; + try { bool notifyDone = !_prepareTimestamp.isNull(); if (_session && _active) { @@ -92,7 +97,7 @@ void WiredTigerRecoveryUnit::_commit() { } for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { - (*it)->commit(); + (*it)->commit(commitTime); } _changes.clear(); @@ -273,6 +278,18 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { } invariantWTOK(wtRet); + invariant(!_lastTimestampSet || _commitTimestamp.isNull(), + str::stream() << "Cannot have both a _lastTimestampSet and a " + "_commitTimestamp. _lastTimestampSet: " + << _lastTimestampSet->toString() + << ". _commitTimestamp: " + << _commitTimestamp.toString()); + + // We reset the _lastTimestampSet between transactions. Since it is legal for one + // transaction on a RecoveryUnit to call setTimestamp() and another to call + // setCommitTimestamp(). + _lastTimestampSet = boost::none; + _active = false; _prepareTimestamp = Timestamp(); _mySnapshotId = nextSnapshotId.fetchAndAdd(1); @@ -382,6 +399,8 @@ Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) { << " and trying to set WUOW timestamp to " << timestamp.toString()); + _lastTimestampSet = timestamp; + // Starts the WT transaction associated with this session. getSession(); @@ -399,6 +418,12 @@ void WiredTigerRecoveryUnit::setCommitTimestamp(Timestamp timestamp) { str::stream() << "Commit timestamp set to " << _commitTimestamp.toString() << " and trying to set it to " << timestamp.toString()); + invariant(!_lastTimestampSet, + str::stream() << "Last timestamp set is " << _lastTimestampSet->toString() + << " and trying to set commit timestamp to " + << timestamp.toString()); + invariant(!_isTimestamped); + _commitTimestamp = timestamp; } @@ -409,6 +434,11 @@ Timestamp WiredTigerRecoveryUnit::getCommitTimestamp() { void WiredTigerRecoveryUnit::clearCommitTimestamp() { invariant(!_inUnitOfWork); invariant(!_commitTimestamp.isNull()); + invariant(!_lastTimestampSet, + str::stream() << "Last timestamp set is " << _lastTimestampSet->toString() + << " and trying to clear commit timestamp."); + invariant(!_isTimestamped); + _commitTimestamp = Timestamp(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index ff005d662bf..f679240d3df 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -162,6 +162,7 @@ private: bool _orderedCommit = true; Timestamp _commitTimestamp; Timestamp _prepareTimestamp; + boost::optional<Timestamp> _lastTimestampSet; uint64_t _mySnapshotId; Timestamp _majorityCommittedSnapshot; Timestamp _readAtTimestamp; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index 2633054017b..62b9e188c34 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -229,5 +229,253 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, WriteOnADocumentBeingPreparedTriggersW ru2->abortUnitOfWork(); } +TEST_F(WiredTigerRecoveryUnitTestFixture, + ChangeIsPassedEmptyLastTimestampSetOnCommitWithNoTimestamp) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + wuow.commit(); + } + ASSERT(!commitTs); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsPassedLastTimestampSetOnCommit) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + Timestamp ts2(6, 6); + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1)); + ASSERT(!commitTs); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts2)); + ASSERT(!commitTs); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1)); + ASSERT(!commitTs); + wuow.commit(); + ASSERT_EQ(*commitTs, ts1); + } + ASSERT_EQ(*commitTs, ts1); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsNotPassedLastTimestampSetOnAbort) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1)); + ASSERT(!commitTs); + } + ASSERT(!commitTs); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsPassedCommitTimestamp) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + + opCtx->recoveryUnit()->setCommitTimestamp(ts1); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + wuow.commit(); + ASSERT_EQ(*commitTs, ts1); + } + ASSERT_EQ(*commitTs, ts1); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsNotPassedCommitTimestampIfCleared) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + + opCtx->recoveryUnit()->setCommitTimestamp(ts1); + ASSERT(!commitTs); + opCtx->recoveryUnit()->clearCommitTimestamp(); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + wuow.commit(); + } + ASSERT(!commitTs); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsPassedNewestCommitTimestamp) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + Timestamp ts2(6, 6); + + opCtx->recoveryUnit()->setCommitTimestamp(ts2); + ASSERT(!commitTs); + opCtx->recoveryUnit()->clearCommitTimestamp(); + ASSERT(!commitTs); + opCtx->recoveryUnit()->setCommitTimestamp(ts1); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + wuow.commit(); + ASSERT_EQ(*commitTs, ts1); + } + ASSERT_EQ(*commitTs, ts1); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsNotPassedCommitTimestampOnAbort) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + + opCtx->recoveryUnit()->setCommitTimestamp(ts1); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + } + ASSERT(!commitTs); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampBeforeSetTimestampOnCommit) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + Timestamp ts2(6, 6); + + opCtx->recoveryUnit()->setCommitTimestamp(ts2); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + wuow.commit(); + ASSERT_EQ(*commitTs, ts2); + } + ASSERT_EQ(*commitTs, ts2); + opCtx->recoveryUnit()->clearCommitTimestamp(); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1)); + ASSERT_EQ(*commitTs, ts2); + wuow.commit(); + ASSERT_EQ(*commitTs, ts1); + } + ASSERT_EQ(*commitTs, ts1); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampAfterSetTimestampOnCommit) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + Timestamp ts2(6, 6); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts2)); + ASSERT(!commitTs); + wuow.commit(); + ASSERT_EQ(*commitTs, ts2); + } + ASSERT_EQ(*commitTs, ts2); + + opCtx->recoveryUnit()->setCommitTimestamp(ts1); + ASSERT_EQ(*commitTs, ts2); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT_EQ(*commitTs, ts2); + wuow.commit(); + ASSERT_EQ(*commitTs, ts1); + } + ASSERT_EQ(*commitTs, ts1); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampBeforeSetTimestampOnAbort) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + Timestamp ts2(6, 6); + + opCtx->recoveryUnit()->setCommitTimestamp(ts2); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + } + ASSERT(!commitTs); + opCtx->recoveryUnit()->clearCommitTimestamp(); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1)); + ASSERT(!commitTs); + } + ASSERT(!commitTs); +} + +TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampAfterSetTimestampOnAbort) { + boost::optional<Timestamp> commitTs = boost::none; + auto opCtx = clientAndCtx1.second.get(); + Timestamp ts1(5, 5); + Timestamp ts2(6, 6); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts2)); + ASSERT(!commitTs); + } + ASSERT(!commitTs); + + opCtx->recoveryUnit()->setCommitTimestamp(ts1); + ASSERT(!commitTs); + + { + WriteUnitOfWork wuow(opCtx); + opCtx->recoveryUnit()->onCommit( + [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; }); + ASSERT(!commitTs); + } + ASSERT(!commitTs); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp index 199c201f07c..b0076f03926 100644 --- a/src/mongo/db/views/durable_view_catalog.cpp +++ b/src/mongo/db/views/durable_view_catalog.cpp @@ -57,7 +57,8 @@ void DurableViewCatalog::onExternalChange(OperationContext* opCtx, const Namespa Database* db = DatabaseHolder::getDatabaseHolder().get(opCtx, name.db()); if (db) { - opCtx->recoveryUnit()->onCommit([db]() { db->getViewCatalog()->invalidate(); }); + opCtx->recoveryUnit()->onCommit( + [db](boost::optional<Timestamp>) { db->getViewCatalog()->invalidate(); }); } } diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp index 1a029f0d5c4..16c660d8e5c 100644 --- a/src/mongo/db/views/view_catalog.cpp +++ b/src/mongo/db/views/view_catalog.cpp @@ -162,7 +162,8 @@ Status ViewCatalog::_createOrUpdateView_inlock(OperationContext* opCtx, }); // We may get invalidated, but we're exclusively locked, so the change must be ours. - opCtx->recoveryUnit()->onCommit([this]() { this->_valid.store(true); }); + opCtx->recoveryUnit()->onCommit( + [this](boost::optional<Timestamp>) { this->_valid.store(true); }); return Status::OK(); } @@ -377,7 +378,8 @@ Status ViewCatalog::dropView(OperationContext* opCtx, const NamespaceString& vie }); // We may get invalidated, but we're exclusively locked, so the change must be ours. - opCtx->recoveryUnit()->onCommit([this]() { this->_valid.store(true); }); + opCtx->recoveryUnit()->onCommit( + [this](boost::optional<Timestamp>) { this->_valid.store(true); }); return Status::OK(); } |