summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2018-04-24 16:37:18 -0400
committerJudah Schvimer <judah@mongodb.com>2018-05-03 12:01:09 -0400
commitdd45579fef2bf4e212161d1e3d97666a80b08bbf (patch)
treeb12df773845f5994240d8c50b0e171f277beb12a /src
parentb1a7cffd3f151efe4d47864c89cf5c7cf9e884cc (diff)
downloadmongo-dd45579fef2bf4e212161d1e3d97666a80b08bbf.tar.gz
SERVER-34580 Plumb commit time to commit handlers when available
Diffstat (limited to 'src')
-rw-r--r--src/mongo/client/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/client/embedded/replication_coordinator_embedded.h2
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_local.cpp2
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp9
-rw-r--r--src/mongo/db/catalog/database_impl.cpp10
-rw-r--r--src/mongo/db/catalog/index_catalog_entry_impl.cpp31
-rw-r--r--src/mongo/db/catalog/index_catalog_impl.cpp57
-rw-r--r--src/mongo/db/catalog/index_create_impl.cpp4
-rw-r--r--src/mongo/db/catalog/uuid_catalog.cpp2
-rw-r--r--src/mongo/db/commands/drop_indexes.cpp7
-rw-r--r--src/mongo/db/commands/feature_compatibility_version.cpp2
-rw-r--r--src/mongo/db/commands/snapshot_management.cpp4
-rw-r--r--src/mongo/db/logical_clock.cpp9
-rw-r--r--src/mongo/db/logical_clock.h6
-rw-r--r--src/mongo/db/repair_database.cpp7
-rw-r--r--src/mongo/db/repl/oplog.cpp57
-rw-r--r--src/mongo/db/repl/replication_coordinator.h15
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp21
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp2
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp4
-rw-r--r--src/mongo/db/session.cpp3
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp6
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/kv/kv_catalog.cpp4
-rw-r--r--src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp4
-rw-r--r--src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp4
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/heap_record_store_btree.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp4
-rw-r--r--src/mongo/db/storage/mobile/mobile_record_store.cpp4
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/recovery_unit.h12
-rw-r--r--src/mongo/db/storage/recovery_unit_noop.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp32
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp248
-rw-r--r--src/mongo/db/views/durable_view_catalog.cpp3
-rw-r--r--src/mongo/db/views/view_catalog.cpp6
-rw-r--r--src/mongo/scripting/engine.cpp2
47 files changed, 449 insertions, 174 deletions
diff --git a/src/mongo/client/embedded/replication_coordinator_embedded.cpp b/src/mongo/client/embedded/replication_coordinator_embedded.cpp
index 85d272a6bb7..52484a49d8b 100644
--- a/src/mongo/client/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/client/embedded/replication_coordinator_embedded.cpp
@@ -118,10 +118,6 @@ bool ReplicationCoordinatorEmbedded::isReplEnabled() const {
return false;
}
-Timestamp ReplicationCoordinatorEmbedded::getMinimumVisibleSnapshot(OperationContext* opCtx) {
- return Timestamp();
-}
-
WriteConcernOptions ReplicationCoordinatorEmbedded::populateUnsetWriteConcernOptionsSyncMode(
WriteConcernOptions wc) {
WriteConcernOptions writeConcern(wc);
diff --git a/src/mongo/client/embedded/replication_coordinator_embedded.h b/src/mongo/client/embedded/replication_coordinator_embedded.h
index d76387a5ca1..c30cb6a77c8 100644
--- a/src/mongo/client/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/client/embedded/replication_coordinator_embedded.h
@@ -79,8 +79,6 @@ public:
WriteConcernOptions getGetLastErrorDefault() override;
- Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) override;
-
WriteConcernOptions populateUnsetWriteConcernOptionsSyncMode(WriteConcernOptions wc) override;
bool buildsIndexes() override;
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index f33b74d7625..fdb33e83840 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -869,13 +869,13 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/database',
'$BUILD_DIR/mongo/db/storage/mmap_v1/repair_database_interface',
'background',
+ 'logical_clock',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/document_validation',
'$BUILD_DIR/mongo/db/catalog/index_create',
'$BUILD_DIR/mongo/db/catalog/index_key_validate',
'$BUILD_DIR/mongo/db/repl/oplog',
- '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
],
)
@@ -1542,6 +1542,7 @@ env.Library(
'logical_clock.cpp',
],
LIBDEPS=[
+ 'global_settings',
'logical_time',
'server_parameters',
'service_context',
diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp
index 56cd2e2969f..2a35c3a2ffe 100644
--- a/src/mongo/db/auth/authz_manager_external_state_local.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp
@@ -539,7 +539,7 @@ public:
_isO2Set(o2 ? true : false),
_o2(_isO2Set ? o2->getOwned() : BSONObj()) {}
- virtual void commit() {
+ virtual void commit(boost::optional<Timestamp>) {
stdx::lock_guard<stdx::mutex> lk(_externalState->_roleGraphMutex);
Status status = _externalState->_roleGraph.handleLogOp(
_opCtx, _op.c_str(), _nss, _o, _isO2Set ? &_o2 : NULL);
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 0c8eaa0e7d9..253abd2f73e 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -328,7 +328,8 @@ Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,
if (!status.isOK())
return status;
- opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
+ opCtx->recoveryUnit()->onCommit(
+ [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); });
return status;
}
@@ -380,7 +381,8 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx,
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), begin, end, fromMigrate);
- opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
+ opCtx->recoveryUnit()->onCommit(
+ [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); });
return Status::OK();
}
@@ -449,7 +451,8 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx,
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), inserts.begin(), inserts.end(), false);
- opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
+ opCtx->recoveryUnit()->onCommit(
+ [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); });
return loc.getStatus();
}
diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp
index 4d195d3cf58..fea411a7177 100644
--- a/src/mongo/db/catalog/database_impl.cpp
+++ b/src/mongo/db/catalog/database_impl.cpp
@@ -111,16 +111,16 @@ public:
AddCollectionChange(OperationContext* opCtx, DatabaseImpl* db, StringData ns)
: _opCtx(opCtx), _db(db), _ns(ns.toString()) {}
- virtual void commit() {
+ virtual void commit(boost::optional<Timestamp> commitTime) {
CollectionMap::const_iterator it = _db->_collections.find(_ns);
if (it == _db->_collections.end())
return;
// Ban reading from this collection on committed reads on snapshots before now.
- auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
- auto snapshotName = replCoord->getMinimumVisibleSnapshot(_opCtx);
- it->second->setMinimumVisibleSnapshot(snapshotName);
+ if (commitTime) {
+ it->second->setMinimumVisibleSnapshot(commitTime.get());
+ }
}
virtual void rollback() {
@@ -143,7 +143,7 @@ public:
// Takes ownership of coll (but not db).
RemoveCollectionChange(DatabaseImpl* db, Collection* coll) : _db(db), _coll(coll) {}
- virtual void commit() {
+ virtual void commit(boost::optional<Timestamp>) {
delete _coll;
}
diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.cpp b/src/mongo/db/catalog/index_catalog_entry_impl.cpp
index 1ef395b85b4..7c5f71a1b7f 100644
--- a/src/mongo/db/catalog/index_catalog_entry_impl.cpp
+++ b/src/mongo/db/catalog/index_catalog_entry_impl.cpp
@@ -184,7 +184,7 @@ class IndexCatalogEntryImpl::SetHeadChange : public RecoveryUnit::Change {
public:
SetHeadChange(IndexCatalogEntryImpl* ice, RecordId oldHead) : _ice(ice), _oldHead(oldHead) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
_ice->_head = _oldHead;
}
@@ -273,22 +273,23 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx,
// When the recovery unit commits, update the multikey paths if needed and clear the plan cache
// if the index metadata has changed.
- opCtx->recoveryUnit()->onCommit([this, multikeyPaths, indexMetadataHasChanged] {
- _isMultikey.store(true);
-
- if (_indexTracksPathLevelMultikeyInfo) {
- stdx::lock_guard<stdx::mutex> lk(_indexMultikeyPathsMutex);
- for (size_t i = 0; i < multikeyPaths.size(); ++i) {
- _indexMultikeyPaths[i].insert(multikeyPaths[i].begin(), multikeyPaths[i].end());
+ opCtx->recoveryUnit()->onCommit(
+ [this, multikeyPaths, indexMetadataHasChanged](boost::optional<Timestamp>) {
+ _isMultikey.store(true);
+
+ if (_indexTracksPathLevelMultikeyInfo) {
+ stdx::lock_guard<stdx::mutex> lk(_indexMultikeyPathsMutex);
+ for (size_t i = 0; i < multikeyPaths.size(); ++i) {
+ _indexMultikeyPaths[i].insert(multikeyPaths[i].begin(), multikeyPaths[i].end());
+ }
}
- }
- if (indexMetadataHasChanged && _infoCache) {
- LOG(1) << _ns << ": clearing plan cache - index " << _descriptor->keyPattern()
- << " set to multi key.";
- _infoCache->clearQueryCache();
- }
- });
+ if (indexMetadataHasChanged && _infoCache) {
+ LOG(1) << _ns << ": clearing plan cache - index " << _descriptor->keyPattern()
+ << " set to multi key.";
+ _infoCache->clearQueryCache();
+ }
+ });
}
// ----
diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp
index 65cfa0dbb1c..572b54e4a05 100644
--- a/src/mongo/db/catalog/index_catalog_impl.cpp
+++ b/src/mongo/db/catalog/index_catalog_impl.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/index_names.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/keypattern.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/delete.h"
@@ -61,7 +62,6 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/util/assert_util.h"
@@ -425,12 +425,13 @@ Status IndexCatalogImpl::IndexBuildBlock::init() {
if (isBackgroundIndex) {
_opCtx->recoveryUnit()->onCommit(
- [ opCtx = _opCtx, entry = _entry, collection = _collection ] {
+ [ opCtx = _opCtx, entry = _entry, collection = _collection ](
+ boost::optional<Timestamp> commitTime) {
// This will prevent the unfinished index from being visible on index iterators.
- auto minVisible =
- repl::ReplicationCoordinator::get(opCtx)->getMinimumVisibleSnapshot(opCtx);
- entry->setMinimumVisibleSnapshot(minVisible);
- collection->setMinimumVisibleSnapshot(minVisible);
+ if (commitTime) {
+ entry->setMinimumVisibleSnapshot(commitTime.get());
+ collection->setMinimumVisibleSnapshot(commitTime.get());
+ }
});
}
@@ -481,19 +482,24 @@ void IndexCatalogImpl::IndexBuildBlock::success() {
OperationContext* opCtx = _opCtx;
LOG(2) << "marking index " << _indexName << " as ready in snapshot id "
<< opCtx->recoveryUnit()->getSnapshotId();
- _opCtx->recoveryUnit()->onCommit([opCtx, entry, collection] {
- // Note: this runs after the WUOW commits but before we release our X lock on the
- // collection. This means that any snapshot created after this must include the full index,
- // and no one can try to read this index before we set the visibility.
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- auto snapshotName = replCoord->getMinimumVisibleSnapshot(opCtx);
- entry->setMinimumVisibleSnapshot(snapshotName);
-
- // TODO remove this once SERVER-20439 is implemented. It is a stopgap solution for
- // SERVER-20260 to make sure that reads with majority readConcern level can see indexes that
- // are created with w:majority by making the readers block.
- collection->setMinimumVisibleSnapshot(snapshotName);
- });
+ _opCtx->recoveryUnit()->onCommit(
+ [opCtx, entry, collection](boost::optional<Timestamp> commitTime) {
+ // Note: this runs after the WUOW commits but before we release our X lock on the
+ // collection. This means that any snapshot created after this must include the full
+ // index, and no one can try to read this index before we set the visibility.
+ if (!commitTime) {
+ // The end of background index builds on secondaries does not get a commit
+ // timestamp. We use the cluster time since it's guaranteed to be greater than the
+ // time of the index build. It is possible the cluster time could be in the future,
+ // and we will need to do another write to reach the minimum visible snapshot.
+ commitTime = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp();
+ }
+ entry->setMinimumVisibleSnapshot(commitTime.get());
+ // We must also set the minimum visible snapshot on the collection like during init().
+ // This prevents reads in the past from reading inconsistent metadata. We should be
+ // able to remove this when the catalog is versioned.
+ collection->setMinimumVisibleSnapshot(commitTime.get());
+ });
entry->setIsReady(true);
}
@@ -983,11 +989,16 @@ public:
IndexCatalogEntry* entry)
: _opCtx(opCtx), _collection(collection), _entries(entries), _entry(entry) {}
- void commit() final {
+ void commit(boost::optional<Timestamp> commitTime) final {
// Ban reading from this collection on committed reads on snapshots before now.
- auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
- auto snapshotName = replCoord->getMinimumVisibleSnapshot(_opCtx);
- _collection->setMinimumVisibleSnapshot(snapshotName);
+ if (!commitTime) {
+ // This is called when we refresh the index catalog entry, which does not always have
+ // a commit timestamp. We use the cluster time since it's guaranteed to be greater than
+ // the time of the index removal. It is possible the cluster time could be in the
+ // future, and we will need to do another write to reach the minimum visible snapshot.
+ commitTime = LogicalClock::getClusterTimeForReplicaSet(_opCtx).asTimestamp();
+ }
+ _collection->setMinimumVisibleSnapshot(commitTime.get());
delete _entry;
}
diff --git a/src/mongo/db/catalog/index_create_impl.cpp b/src/mongo/db/catalog/index_create_impl.cpp
index aa04c84b8e9..2a39729f3db 100644
--- a/src/mongo/db/catalog/index_create_impl.cpp
+++ b/src/mongo/db/catalog/index_create_impl.cpp
@@ -133,7 +133,7 @@ class MultiIndexBlockImpl::SetNeedToCleanupOnRollback : public RecoveryUnit::Cha
public:
explicit SetNeedToCleanupOnRollback(MultiIndexBlockImpl* indexer) : _indexer(indexer) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
_indexer->_needToCleanup = true;
}
@@ -151,7 +151,7 @@ class MultiIndexBlockImpl::CleanupIndexesVectorOnRollback : public RecoveryUnit:
public:
explicit CleanupIndexesVectorOnRollback(MultiIndexBlockImpl* indexer) : _indexer(indexer) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
_indexer->_indexes.clear();
}
diff --git a/src/mongo/db/catalog/uuid_catalog.cpp b/src/mongo/db/catalog/uuid_catalog.cpp
index 0a50cadfa71..7b43be4f1b1 100644
--- a/src/mongo/db/catalog/uuid_catalog.cpp
+++ b/src/mongo/db/catalog/uuid_catalog.cpp
@@ -127,7 +127,7 @@ void UUIDCatalog::onRenameCollection(OperationContext* opCtx,
GetNewCollectionFunction getNewCollection,
CollectionUUID uuid) {
Collection* oldColl = removeUUIDCatalogEntry(uuid);
- opCtx->recoveryUnit()->onCommit([this, getNewCollection, uuid] {
+ opCtx->recoveryUnit()->onCommit([this, getNewCollection, uuid](boost::optional<Timestamp>) {
// Reset current UUID entry in case some other operation updates the UUID catalog before the
// WUOW is committed. registerUUIDCatalogEntry() is a no-op if there's an existing UUID
// entry.
diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes.cpp
index f6ebf5ec180..8d8fc318eff 100644
--- a/src/mongo/db/commands/drop_indexes.cpp
+++ b/src/mongo/db/commands/drop_indexes.cpp
@@ -51,8 +51,8 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/index_builder.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
#include "mongo/util/log.h"
@@ -218,9 +218,8 @@ public:
// This was also done when dropAllIndexes() committed, but we need to ensure that no one
// tries to read in the intermediate state where all indexes are newer than the current
// snapshot so are unable to be used.
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- auto snapshotName = replCoord->getMinimumVisibleSnapshot(opCtx);
- collection->setMinimumVisibleSnapshot(snapshotName);
+ auto clusterTime = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp();
+ collection->setMinimumVisibleSnapshot(clusterTime);
result.append("nIndexes", static_cast<int>(swIndexesToRebuild.getValue().size()));
result.append("indexes", swIndexesToRebuild.getValue());
diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp
index c33031abc8d..c9cacca5f32 100644
--- a/src/mongo/db/commands/feature_compatibility_version.cpp
+++ b/src/mongo/db/commands/feature_compatibility_version.cpp
@@ -159,7 +159,7 @@ void FeatureCompatibilityVersion::onInsertOrUpdate(OperationContext* opCtx, cons
<< FeatureCompatibilityVersionParser::toString(newVersion);
}
- opCtx->recoveryUnit()->onCommit([opCtx, newVersion]() {
+ opCtx->recoveryUnit()->onCommit([opCtx, newVersion](boost::optional<Timestamp>) {
serverGlobalParams.featureCompatibility.setVersion(newVersion);
updateMinWireVersion();
diff --git a/src/mongo/db/commands/snapshot_management.cpp b/src/mongo/db/commands/snapshot_management.cpp
index 7059f937318..9a9084b8723 100644
--- a/src/mongo/db/commands/snapshot_management.cpp
+++ b/src/mongo/db/commands/snapshot_management.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/service_context.h"
@@ -79,8 +80,7 @@ public:
Lock::GlobalLock lk(opCtx, MODE_IX);
- const auto name =
- repl::ReplicationCoordinator::get(opCtx)->getMinimumVisibleSnapshot(opCtx);
+ auto name = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp();
result.append("name", static_cast<long long>(name.asULL()));
return CommandHelpers::appendCommandStatus(result, Status::OK());
diff --git a/src/mongo/db/logical_clock.cpp b/src/mongo/db/logical_clock.cpp
index a9b7cd62485..efa2d55831b 100644
--- a/src/mongo/db/logical_clock.cpp
+++ b/src/mongo/db/logical_clock.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/base/status.h"
+#include "mongo/db/global_settings.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -76,6 +77,14 @@ bool lessThanOrEqualToMaxPossibleTime(LogicalTime time, uint64_t nTicks) {
}
}
+LogicalTime LogicalClock::getClusterTimeForReplicaSet(OperationContext* opCtx) {
+ if (getGlobalReplSettings().usingReplSets()) {
+ return get(opCtx)->getClusterTime();
+ }
+
+ return {};
+}
+
LogicalClock* LogicalClock::get(ServiceContext* service) {
return getLogicalClock(service).get();
}
diff --git a/src/mongo/db/logical_clock.h b/src/mongo/db/logical_clock.h
index fc662eb78ad..e0f5c135c1c 100644
--- a/src/mongo/db/logical_clock.h
+++ b/src/mongo/db/logical_clock.h
@@ -52,6 +52,12 @@ public:
Seconds(365 * 24 * 60 * 60); // 1 year
/**
+ * Returns the current cluster time if this is a replica set node, otherwise returns a null
+ * logical time.
+ */
+ static LogicalTime getClusterTimeForReplicaSet(OperationContext* opCtx);
+
+ /**
* Creates an instance of LogicalClock.
*/
LogicalClock(ServiceContext*);
diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp
index 7c106ec1d62..df89ce310c6 100644
--- a/src/mongo/db/repair_database.cpp
+++ b/src/mongo/db/repair_database.cpp
@@ -50,7 +50,7 @@
#include "mongo/db/catalog/namespace_uuid_cache.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/storage/mmap_v1/repair_database_interface.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/util/log.h"
@@ -278,11 +278,10 @@ Status repairDatabase(OperationContext* opCtx,
// Set the minimum snapshot for all Collections in this db. This ensures that readers
// using majority readConcern level can only use the collections after their repaired
// versions are in the committed view.
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- auto snapshotName = replCoord->getMinimumVisibleSnapshot(opCtx);
+ auto clusterTime = LogicalClock::getClusterTimeForReplicaSet(opCtx).asTimestamp();
for (auto&& collection : *db) {
- collection->setMinimumVisibleSnapshot(snapshotName);
+ collection->setMinimumVisibleSnapshot(clusterTime);
}
// Restore oplog Collection pointer cache.
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0a314399d77..32ffed83e67 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -391,32 +391,41 @@ void _logOpsInner(OperationContext* opCtx,
checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
-
- auto lastAppliedTimestamp = finalOpTime.getTimestamp();
- const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
- if (storageEngine->supportsDocLocking()) {
- // If the storage engine supports document level locking, then it is possible for
- // oplog writes to commit out of order. In that case, we only want to set our last
- // applied optime to the all committed timestamp to ensure that all operations earlier
- // than the last applied optime have been storage-committed. We are guaranteed that
- // whatever operation occurred at the all committed timestamp occurred during the same
- // term as 'finalOpTime'. When a primary enters a new term, it first commits a
- // 'new primary' oplog entry in the new term before accepting any new writes. This
- // will ensure that the all committed timestamp is in the new term before any client
- // writes are committed.
- lastAppliedTimestamp = storageEngine->getAllCommittedTimestamp(opCtx);
- }
+ opCtx->recoveryUnit()->onCommit(
+ [opCtx, replCoord, finalOpTime](boost::optional<Timestamp> commitTime) {
+ if (commitTime) {
+ // The `finalOpTime` may be less than the `commitTime` if multiple oplog entries
+ // are logging within one WriteUnitOfWork.
+ invariant(finalOpTime.getTimestamp() <= *commitTime,
+ str::stream() << "Final OpTime: " << finalOpTime.toString()
+ << ". Commit Time: "
+ << commitTime->toString());
+ }
+
+ auto lastAppliedTimestamp = finalOpTime.getTimestamp();
+ const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
+ if (storageEngine->supportsDocLocking()) {
+ // If the storage engine supports document level locking, then it is possible for
+ // oplog writes to commit out of order. In that case, we only want to set our last
+ // applied optime to the all committed timestamp to ensure that all operations
+ // earlier than the last applied optime have been storage-committed. We are
+ // guaranteed that whatever operation occurred at the all committed timestamp
+ // occurred during the same term as 'finalOpTime'. When a primary enters a new term,
+ // it first commits a 'new primary' oplog entry in the new term before accepting any
+ // new writes. This will ensure that the all committed timestamp is in the new term
+ // before any client writes are committed.
+ lastAppliedTimestamp = storageEngine->getAllCommittedTimestamp(opCtx);
+ }
- // Optimes on the primary should always represent consistent database states.
- replCoord->setMyLastAppliedOpTimeForward(
- OpTime(lastAppliedTimestamp, finalOpTime.getTerm()),
- ReplicationCoordinator::DataConsistency::Consistent);
+ // Optimes on the primary should always represent consistent database states.
+ replCoord->setMyLastAppliedOpTimeForward(
+ OpTime(lastAppliedTimestamp, finalOpTime.getTerm()),
+ ReplicationCoordinator::DataConsistency::Consistent);
- // We set the last op on the client to 'finalOpTime', because that contains the timestamp
- // of the operation that the client actually performed.
- ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
- });
+ // We set the last op on the client to 'finalOpTime', because that contains the
+ // timestamp of the operation that the client actually performed.
+ ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
+ });
}
OpTime logOp(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 7b6b77beb5f..c286c37f71e 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -761,21 +761,6 @@ public:
virtual Status updateTerm(OperationContext* opCtx, long long term) = 0;
/**
- * Returns the minimum visible snapshot for this operation.
- *
- * This name is guaranteed to compare > all names reserved before and < all names reserved
- * after.
- *
- * This method will not take any locks or attempt to access storage using the passed-in
- * OperationContext. It will only be used to return reserved SnapshotNames by each operation so
- * callers can correctly wait for the reserved snapshot to be visible.
- *
- * A null OperationContext can be used in cases where the snapshot to wait for should not be
- * adjusted.
- */
- virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) = 0;
-
- /**
* Blocks until either the current committed snapshot is at least as high as 'untilSnapshot',
* or we are interrupted for any reason, including shutdown or maxTimeMs expiration.
* 'opCtx' is used to checkForInterrupt and enforce maxTimeMS.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 34b40057bd8..d0d46498f1f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -3358,27 +3358,6 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_inlock(
return EventHandle();
}
-Timestamp ReplicationCoordinatorImpl::getMinimumVisibleSnapshot(OperationContext* opCtx) {
- Timestamp reservedName;
- if (getReplicationMode() == Mode::modeReplSet) {
- invariant(opCtx->lockState()->isLocked());
- if (getMemberState().primary() || opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
- // Use the current optime on the node, for primary nodes. Additionally, completion of
- // background index builds on secondaries will not have a `commit time` and must also
- // use the current optime.
- reservedName = LogicalClock::get(getServiceContext())->getClusterTime().asTimestamp();
- } else {
- // This function is only called when applying command operations on secondaries.
- // We ask the RecoveryUnit what timestamp it will assign to this write.
- reservedName = opCtx->recoveryUnit()->getCommitTimestamp();
- }
- } else {
- // All snapshots are the same for a standalone node.
- reservedName = Timestamp();
- }
- return reservedName;
-}
-
void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* opCtx,
const Timestamp& untilSnapshot) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 36a03adda66..9db6442ca68 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -288,8 +288,6 @@ public:
virtual Status updateTerm(OperationContext* opCtx, long long term) override;
- virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) override;
-
virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
virtual void waitUntilSnapshotCommitted(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 437aeea1862..cc8aca366e2 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -442,10 +442,6 @@ Status ReplicationCoordinatorMock::updateTerm(OperationContext* opCtx, long long
return Status::OK();
}
-Timestamp ReplicationCoordinatorMock::getMinimumVisibleSnapshot(OperationContext* opCtx) {
- return Timestamp(_snapshotNameGenerator.addAndFetch(1));
-}
-
void ReplicationCoordinatorMock::dropAllSnapshots() {}
OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() const {
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index fd5eb00d58c..973c2afb15b 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -241,8 +241,6 @@ public:
virtual Status updateTerm(OperationContext* opCtx, long long term);
- virtual Timestamp getMinimumVisibleSnapshot(OperationContext* opCtx) override;
-
virtual void dropAllSnapshots() override;
virtual OpTime getCurrentCommittedSnapshotOpTime() const override;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index b94279b35d6..be6aafbe27f 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -146,7 +146,7 @@ public:
_opTime(opTime),
_prePostImageOpTime(prePostImageOpTime) {}
- void commit() override {
+ void commit(boost::optional<Timestamp>) override {
switch (_op) {
case 'd': {
stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex);
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index a7bc335294d..51c9fe954c7 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -69,7 +69,7 @@ public:
CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss)
: _opCtx(opCtx), _nss(nss) {}
- void commit() override {
+ void commit(boost::optional<Timestamp>) override {
invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss);
@@ -95,7 +95,7 @@ public:
ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity)
: _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {}
- void commit() override {
+ void commit(boost::optional<Timestamp>) override {
fassertNoTrace(
40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity));
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 84021db98aa..7d129b164ed 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -1128,7 +1128,8 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
opCtx->recoveryUnit()->onCommit(
- [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] {
+ [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
+ boost::optional<Timestamp>) {
RetryableWritesStats::get(getGlobalServiceContext())
->incrementTransactionsCollectionWriteCount();
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp
index 4488c730431..3d212b09900 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_btree_impl.cpp
@@ -486,7 +486,7 @@ private:
IndexChange(IndexSet* data, const IndexKeyEntry& entry, bool insert)
: _data(data), _entry(entry), _insert(insert) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
if (_insert)
_data->erase(_entry);
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
index 5885244528b..f2ed2f99de2 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
@@ -51,7 +51,7 @@ class EphemeralForTestRecordStore::InsertChange : public RecoveryUnit::Change {
public:
InsertChange(OperationContext* opCtx, Data* data, RecordId loc)
: _opCtx(opCtx), _data(data), _loc(loc) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex);
@@ -77,7 +77,7 @@ public:
const EphemeralForTestRecord& rec)
: _opCtx(opCtx), _data(data), _loc(loc), _rec(rec) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
stdx::lock_guard<stdx::recursive_mutex> lock(_data->recordsMutex);
@@ -107,7 +107,7 @@ public:
swap(_records, _data->records);
}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
using std::swap;
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
index 97471b21ca2..e213362ffee 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
@@ -40,7 +40,7 @@ namespace mongo {
void EphemeralForTestRecoveryUnit::commitUnitOfWork() {
try {
for (Changes::iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
- (*it)->commit();
+ (*it)->commit(boost::none);
}
_changes.clear();
} catch (...) {
diff --git a/src/mongo/db/storage/kv/kv_catalog.cpp b/src/mongo/db/storage/kv/kv_catalog.cpp
index 86ff663d7c1..85ed6c7f26b 100644
--- a/src/mongo/db/storage/kv/kv_catalog.cpp
+++ b/src/mongo/db/storage/kv/kv_catalog.cpp
@@ -139,7 +139,7 @@ public:
AddIdentChange(KVCatalog* catalog, StringData ident)
: _catalog(catalog), _ident(ident.toString()) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
stdx::lock_guard<stdx::mutex> lk(_catalog->_identsLock);
_catalog->_idents.erase(_ident);
@@ -154,7 +154,7 @@ public:
RemoveIdentChange(KVCatalog* catalog, StringData ident, const Entry& entry)
: _catalog(catalog), _ident(ident.toString()), _entry(entry) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
stdx::lock_guard<stdx::mutex> lk(_catalog->_identsLock);
_catalog->_idents[_ident] = _entry;
diff --git a/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp b/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp
index 1171ba8467a..1163f89cf3c 100644
--- a/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp
+++ b/src/mongo/db/storage/kv/kv_collection_catalog_entry.cpp
@@ -55,7 +55,7 @@ public:
AddIndexChange(OperationContext* opCtx, KVCollectionCatalogEntry* cce, StringData ident)
: _opCtx(opCtx), _cce(cce), _ident(ident.toString()) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
// Intentionally ignoring failure.
_cce->_engine->dropIdent(_opCtx, _ident).transitional_ignore();
@@ -72,7 +72,7 @@ public:
: _opCtx(opCtx), _cce(cce), _ident(ident.toString()) {}
virtual void rollback() {}
- virtual void commit() {
+ virtual void commit(boost::optional<Timestamp>) {
// Intentionally ignoring failure here. Since we've removed the metadata pointing to the
// index, we should never see it again anyway.
_cce->_engine->dropIdent(_opCtx, _ident).transitional_ignore();
diff --git a/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp b/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp
index 88f1b7b7e55..49868d7374a 100644
--- a/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp
+++ b/src/mongo/db/storage/kv/kv_database_catalog_entry_base.cpp
@@ -58,7 +58,7 @@ public:
_ident(ident.toString()),
_dropOnRollback(dropOnRollback) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
if (_dropOnRollback) {
// Intentionally ignoring failure
@@ -94,7 +94,7 @@ public:
_entry(entry),
_dropOnCommit(dropOnCommit) {}
- virtual void commit() {
+ virtual void commit(boost::optional<Timestamp>) {
delete _entry;
// Intentionally ignoring failure here. Since we've removed the metadata pointing to the
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index e863e1446b0..7c6fb248668 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -61,7 +61,7 @@ public:
RemoveDBChange(KVStorageEngine* engine, StringData db, KVDatabaseCatalogEntryBase* entry)
: _engine(engine), _db(db.toString()), _entry(entry) {}
- virtual void commit() {
+ virtual void commit(boost::optional<Timestamp>) {
delete _entry;
}
diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp
index 57aba093807..dc0e4aa83e2 100644
--- a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp
+++ b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp
@@ -107,7 +107,7 @@ class BtreeLogic<BtreeLayout>::Builder::SetRightLeafLocChange : public RecoveryU
public:
SetRightLeafLocChange(Builder* builder, DiskLoc oldLoc) : _builder(builder), _oldLoc(oldLoc) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
_builder->_rightLeafLoc = _oldLoc;
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
index 2b235f71961..ff5b114975f 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
@@ -82,7 +82,7 @@ void DurRecoveryUnit::commitChanges() {
try {
for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
- (*it)->commit();
+ (*it)->commit(boost::none);
}
} catch (...) {
std::terminate();
diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
index 1575a721f2d..b21b8cffe90 100644
--- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
+++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
@@ -196,7 +196,7 @@ public:
virtual void abandonSnapshot() {}
virtual void registerChange(Change* change) {
- change->commit();
+ change->commit(boost::none);
delete change;
}
diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp
index 31440e91bc5..369681a8298 100644
--- a/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp
+++ b/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp
@@ -122,7 +122,7 @@ public:
_entry->_removeFromCache(NULL, _ns);
}
- void commit() {}
+ void commit(boost::optional<Timestamp>) {}
private:
const std::string _ns;
@@ -145,7 +145,7 @@ public:
_catalogEntry->_collections[_ns] = _cachedEntry;
}
- void commit() {
+ void commit(boost::optional<Timestamp>) {
delete _cachedEntry;
}
diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp
index b89ca31e6b6..9fbe1aaac42 100644
--- a/src/mongo/db/storage/mobile/mobile_record_store.cpp
+++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp
@@ -681,7 +681,7 @@ class MobileRecordStore::NumRecsChange final : public RecoveryUnit::Change {
public:
NumRecsChange(MobileRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {}
- void commit() override {}
+ void commit(boost::optional<Timestamp>) override {}
void rollback() override {
stdx::lock_guard<stdx::mutex> lock(_rs->_numRecsMutex);
@@ -718,7 +718,7 @@ class MobileRecordStore::DataSizeChange final : public RecoveryUnit::Change {
public:
DataSizeChange(MobileRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {}
- void commit() override {}
+ void commit(boost::optional<Timestamp>) override {}
void rollback() override {
stdx::lock_guard<stdx::mutex> lock(_rs->_dataSizeMutex);
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
index 16a8da49825..2d73e6b2124 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
@@ -53,7 +53,7 @@ void MobileRecoveryUnit::_commit() {
for (auto& change : _changes) {
try {
- change->commit();
+ change->commit(boost::none);
} catch (...) {
std::terminate();
}
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 224e0956ed9..04442aecd8e 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -254,13 +254,17 @@ public:
* that rollback() and commit() may be called after resources with a shorter lifetime than
* the WriteUnitOfWork have been freed. Each registered change will be committed or rolled
* back once.
+ *
+ * commit() handlers are passed the timestamp at which the transaction is committed. If the
+ * transaction is not committed at a particular timestamp, or if the storage engine does not
+ * support timestamps, then boost::none will be supplied for this parameter.
*/
class Change {
public:
virtual ~Change() {}
virtual void rollback() = 0;
- virtual void commit() = 0;
+ virtual void commit(boost::optional<Timestamp> commitTime) = 0;
};
/**
@@ -287,7 +291,7 @@ public:
void rollback() final {
_callback();
}
- void commit() final {}
+ void commit(boost::optional<Timestamp>) final {}
private:
Callback _callback;
@@ -307,8 +311,8 @@ public:
public:
OnCommitChange(Callback&& callback) : _callback(std::move(callback)) {}
void rollback() final {}
- void commit() final {
- _callback();
+ void commit(boost::optional<Timestamp> commitTime) final {
+ _callback(commitTime);
}
private:
diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h
index 0dbc41de3d5..9713b6aa1ec 100644
--- a/src/mongo/db/storage/recovery_unit_noop.h
+++ b/src/mongo/db/storage/recovery_unit_noop.h
@@ -43,7 +43,7 @@ public:
void commitUnitOfWork() final {
for (auto& change : _changes) {
try {
- change->commit();
+ change->commit(boost::none);
} catch (...) {
std::terminate();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index bbefc5c9b11..78ca842d694 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -105,7 +105,7 @@ public:
_highestInserted(highestInserted),
_countInserted(countInserted) {}
- void commit() final {
+ void commit(boost::optional<Timestamp>) final {
invariant(_bytesInserted >= 0);
invariant(_highestInserted.isNormal());
@@ -129,7 +129,7 @@ class WiredTigerRecordStore::OplogStones::TruncateChange final : public Recovery
public:
TruncateChange(OplogStones* oplogStones) : _oplogStones(oplogStones) {}
- void commit() final {
+ void commit(boost::optional<Timestamp>) final {
_oplogStones->_currentRecords.store(0);
_oplogStones->_currentBytes.store(0);
@@ -1606,7 +1606,7 @@ WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit(OperationContext
class WiredTigerRecordStore::NumRecordsChange : public RecoveryUnit::Change {
public:
NumRecordsChange(WiredTigerRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
_rs->_numRecords.fetchAndAdd(-_diff);
}
@@ -1629,7 +1629,7 @@ void WiredTigerRecordStore::_changeNumRecords(OperationContext* opCtx, int64_t d
class WiredTigerRecordStore::DataSizeChange : public RecoveryUnit::Change {
public:
DataSizeChange(WiredTigerRecordStore* rs, int64_t amount) : _rs(rs), _amount(amount) {}
- virtual void commit() {}
+ virtual void commit(boost::optional<Timestamp>) {}
virtual void rollback() {
_rs->_increaseDataSize(NULL, -_amount);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 906eeb7ac6b..2396e17cd45 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -77,6 +77,11 @@ WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() {
}
void WiredTigerRecoveryUnit::_commit() {
+ // Since we cannot have both a _lastTimestampSet and a _commitTimestamp, we set the
+ // commit time as whichever is non-empty. If both are empty, then _lastTimestampSet will
+ // be boost::none and we'll set the commit time to that.
+ auto commitTime = _commitTimestamp.isNull() ? _lastTimestampSet : _commitTimestamp;
+
try {
bool notifyDone = !_prepareTimestamp.isNull();
if (_session && _active) {
@@ -92,7 +97,7 @@ void WiredTigerRecoveryUnit::_commit() {
}
for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
- (*it)->commit();
+ (*it)->commit(commitTime);
}
_changes.clear();
@@ -273,6 +278,18 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
}
invariantWTOK(wtRet);
+ invariant(!_lastTimestampSet || _commitTimestamp.isNull(),
+ str::stream() << "Cannot have both a _lastTimestampSet and a "
+ "_commitTimestamp. _lastTimestampSet: "
+ << _lastTimestampSet->toString()
+ << ". _commitTimestamp: "
+ << _commitTimestamp.toString());
+
+ // We reset the _lastTimestampSet between transactions. Since it is legal for one
+ // transaction on a RecoveryUnit to call setTimestamp() and another to call
+ // setCommitTimestamp().
+ _lastTimestampSet = boost::none;
+
_active = false;
_prepareTimestamp = Timestamp();
_mySnapshotId = nextSnapshotId.fetchAndAdd(1);
@@ -382,6 +399,8 @@ Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) {
<< " and trying to set WUOW timestamp to "
<< timestamp.toString());
+ _lastTimestampSet = timestamp;
+
// Starts the WT transaction associated with this session.
getSession();
@@ -399,6 +418,12 @@ void WiredTigerRecoveryUnit::setCommitTimestamp(Timestamp timestamp) {
str::stream() << "Commit timestamp set to " << _commitTimestamp.toString()
<< " and trying to set it to "
<< timestamp.toString());
+ invariant(!_lastTimestampSet,
+ str::stream() << "Last timestamp set is " << _lastTimestampSet->toString()
+ << " and trying to set commit timestamp to "
+ << timestamp.toString());
+ invariant(!_isTimestamped);
+
_commitTimestamp = timestamp;
}
@@ -409,6 +434,11 @@ Timestamp WiredTigerRecoveryUnit::getCommitTimestamp() {
void WiredTigerRecoveryUnit::clearCommitTimestamp() {
invariant(!_inUnitOfWork);
invariant(!_commitTimestamp.isNull());
+ invariant(!_lastTimestampSet,
+ str::stream() << "Last timestamp set is " << _lastTimestampSet->toString()
+ << " and trying to clear commit timestamp.");
+ invariant(!_isTimestamped);
+
_commitTimestamp = Timestamp();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index ff005d662bf..f679240d3df 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -162,6 +162,7 @@ private:
bool _orderedCommit = true;
Timestamp _commitTimestamp;
Timestamp _prepareTimestamp;
+ boost::optional<Timestamp> _lastTimestampSet;
uint64_t _mySnapshotId;
Timestamp _majorityCommittedSnapshot;
Timestamp _readAtTimestamp;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
index 2633054017b..62b9e188c34 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
@@ -229,5 +229,253 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, WriteOnADocumentBeingPreparedTriggersW
ru2->abortUnitOfWork();
}
+TEST_F(WiredTigerRecoveryUnitTestFixture,
+ ChangeIsPassedEmptyLastTimestampSetOnCommitWithNoTimestamp) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ wuow.commit();
+ }
+ ASSERT(!commitTs);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsPassedLastTimestampSetOnCommit) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ Timestamp ts2(6, 6);
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1));
+ ASSERT(!commitTs);
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts2));
+ ASSERT(!commitTs);
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1));
+ ASSERT(!commitTs);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts1);
+ }
+ ASSERT_EQ(*commitTs, ts1);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsNotPassedLastTimestampSetOnAbort) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1));
+ ASSERT(!commitTs);
+ }
+ ASSERT(!commitTs);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsPassedCommitTimestamp) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts1);
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts1);
+ }
+ ASSERT_EQ(*commitTs, ts1);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsNotPassedCommitTimestampIfCleared) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts1);
+ ASSERT(!commitTs);
+ opCtx->recoveryUnit()->clearCommitTimestamp();
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ wuow.commit();
+ }
+ ASSERT(!commitTs);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsPassedNewestCommitTimestamp) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ Timestamp ts2(6, 6);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts2);
+ ASSERT(!commitTs);
+ opCtx->recoveryUnit()->clearCommitTimestamp();
+ ASSERT(!commitTs);
+ opCtx->recoveryUnit()->setCommitTimestamp(ts1);
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts1);
+ }
+ ASSERT_EQ(*commitTs, ts1);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, ChangeIsNotPassedCommitTimestampOnAbort) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts1);
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ }
+ ASSERT(!commitTs);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampBeforeSetTimestampOnCommit) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ Timestamp ts2(6, 6);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts2);
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts2);
+ }
+ ASSERT_EQ(*commitTs, ts2);
+ opCtx->recoveryUnit()->clearCommitTimestamp();
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1));
+ ASSERT_EQ(*commitTs, ts2);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts1);
+ }
+ ASSERT_EQ(*commitTs, ts1);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampAfterSetTimestampOnCommit) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ Timestamp ts2(6, 6);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts2));
+ ASSERT(!commitTs);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts2);
+ }
+ ASSERT_EQ(*commitTs, ts2);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts1);
+ ASSERT_EQ(*commitTs, ts2);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT_EQ(*commitTs, ts2);
+ wuow.commit();
+ ASSERT_EQ(*commitTs, ts1);
+ }
+ ASSERT_EQ(*commitTs, ts1);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampBeforeSetTimestampOnAbort) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ Timestamp ts2(6, 6);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts2);
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ }
+ ASSERT(!commitTs);
+ opCtx->recoveryUnit()->clearCommitTimestamp();
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts1));
+ ASSERT(!commitTs);
+ }
+ ASSERT(!commitTs);
+}
+
+TEST_F(WiredTigerRecoveryUnitTestFixture, CommitTimestampAfterSetTimestampOnAbort) {
+ boost::optional<Timestamp> commitTs = boost::none;
+ auto opCtx = clientAndCtx1.second.get();
+ Timestamp ts1(5, 5);
+ Timestamp ts2(6, 6);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ ASSERT_OK(opCtx->recoveryUnit()->setTimestamp(ts2));
+ ASSERT(!commitTs);
+ }
+ ASSERT(!commitTs);
+
+ opCtx->recoveryUnit()->setCommitTimestamp(ts1);
+ ASSERT(!commitTs);
+
+ {
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->recoveryUnit()->onCommit(
+ [&](boost::optional<Timestamp> commitTime) { commitTs = commitTime; });
+ ASSERT(!commitTs);
+ }
+ ASSERT(!commitTs);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp
index 199c201f07c..b0076f03926 100644
--- a/src/mongo/db/views/durable_view_catalog.cpp
+++ b/src/mongo/db/views/durable_view_catalog.cpp
@@ -57,7 +57,8 @@ void DurableViewCatalog::onExternalChange(OperationContext* opCtx, const Namespa
Database* db = DatabaseHolder::getDatabaseHolder().get(opCtx, name.db());
if (db) {
- opCtx->recoveryUnit()->onCommit([db]() { db->getViewCatalog()->invalidate(); });
+ opCtx->recoveryUnit()->onCommit(
+ [db](boost::optional<Timestamp>) { db->getViewCatalog()->invalidate(); });
}
}
diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp
index 1a029f0d5c4..16c660d8e5c 100644
--- a/src/mongo/db/views/view_catalog.cpp
+++ b/src/mongo/db/views/view_catalog.cpp
@@ -162,7 +162,8 @@ Status ViewCatalog::_createOrUpdateView_inlock(OperationContext* opCtx,
});
// We may get invalidated, but we're exclusively locked, so the change must be ours.
- opCtx->recoveryUnit()->onCommit([this]() { this->_valid.store(true); });
+ opCtx->recoveryUnit()->onCommit(
+ [this](boost::optional<Timestamp>) { this->_valid.store(true); });
return Status::OK();
}
@@ -377,7 +378,8 @@ Status ViewCatalog::dropView(OperationContext* opCtx, const NamespaceString& vie
});
// We may get invalidated, but we're exclusively locked, so the change must be ours.
- opCtx->recoveryUnit()->onCommit([this]() { this->_valid.store(true); });
+ opCtx->recoveryUnit()->onCommit(
+ [this](boost::optional<Timestamp>) { this->_valid.store(true); });
return Status::OK();
}
diff --git a/src/mongo/scripting/engine.cpp b/src/mongo/scripting/engine.cpp
index 364f89e7b25..e22192fe983 100644
--- a/src/mongo/scripting/engine.cpp
+++ b/src/mongo/scripting/engine.cpp
@@ -187,7 +187,7 @@ bool Scope::execFile(const string& filename, bool printResult, bool reportError,
class Scope::StoredFuncModLogOpHandler : public RecoveryUnit::Change {
public:
- void commit() {
+ void commit(boost::optional<Timestamp>) {
_lastVersion.fetchAndAdd(1);
}
void rollback() {}