summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2017-04-06 15:30:59 -0400
committerEric Milkie <milkie@10gen.com>2017-08-22 13:45:12 -0400
commit77dc6917428ffad4b9ff2d54d78fa9b225f78a4b (patch)
treefa2483cb4214fd6858db09ca80523751f65888f5 /src/mongo
parentb1a36aaa34f48df1573d76439419552282f18cbf (diff)
downloadmongo-77dc6917428ffad4b9ff2d54d78fa9b225f78a4b.tar.gz
SERVER-28620 Adorn all oplog writes with timestamps
These timestamps are now used to implement oplog visibility rules, in place of the current in-memory vector of uncommitted ops that the WiredTiger glue code currently uses. This change also introduces a TimestampedBSONObj class, which encapsulates a BSONObject with its associated write timestamp.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/base/string_data.h12
-rw-r--r--src/mongo/db/catalog/collection.h8
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp19
-rw-r--r--src/mongo/db/catalog/collection_impl.h1
-rw-r--r--src/mongo/db/catalog/collection_mock.h1
-rw-r--r--src/mongo/db/commands/feature_compatibility_version.cpp9
-rw-r--r--src/mongo/db/commands/snapshot_management.cpp2
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp10
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp11
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp24
-rw-r--r--src/mongo/db/repl/oplog.cpp68
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp44
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h27
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp38
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h25
-rw-r--r--src/mongo/db/repl/replication_process_test.cpp38
-rw-r--r--src/mongo/db/repl/replication_recovery_test.cpp13
-rw-r--r--src/mongo/db/repl/storage_interface.h13
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp7
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp273
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h6
-rw-r--r--src/mongo/db/repl/sync_tail.cpp9
-rw-r--r--src/mongo/db/storage/README.md29
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h5
-rw-r--r--src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp22
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp3
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.h2
-rw-r--r--src/mongo/db/storage/record_store.h45
-rw-r--r--src/mongo/db/storage/record_store_test_capped_visibility.cpp18
-rw-r--r--src/mongo/db/storage/record_store_test_harness.cpp3
-rw-r--r--src/mongo/db/storage/record_store_test_insertrecord.cpp6
-rw-r--r--src/mongo/db/storage/recovery_unit.h20
-rw-r--r--src/mongo/db/storage/snapshot_manager.h2
-rw-r--r--src/mongo/db/storage/snapshot_name.h3
-rw-r--r--src/mongo/db/storage/storage_engine.h9
-rw-r--r--src/mongo/db/storage/wiredtiger/SConscript3
-rw-r--r--src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp319
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp32
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h35
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp221
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h97
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp63
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp361
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h76
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp355
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp47
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h52
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp34
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h17
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp63
-rw-r--r--src/mongo/dbtests/repltests.cpp23
-rw-r--r--src/mongo/unittest/temp_dir.h2
61 files changed, 1648 insertions, 1023 deletions
diff --git a/src/mongo/base/string_data.h b/src/mongo/base/string_data.h
index 029da4a2cee..7d896e7ecaf 100644
--- a/src/mongo/base/string_data.h
+++ b/src/mongo/base/string_data.h
@@ -162,30 +162,30 @@ public:
* null-terminated, so if using this without checking size(), you are likely doing
* something wrong.
*/
- const char* rawData() const {
+ constexpr const char* rawData() const {
return _data;
}
- size_t size() const {
+ constexpr size_t size() const {
return _size;
}
- bool empty() const {
+ constexpr bool empty() const {
return size() == 0;
}
std::string toString() const {
return std::string(_data, size());
}
- char operator[](unsigned pos) const {
+ constexpr char operator[](unsigned pos) const {
return _data[pos];
}
//
// iterators
//
- const_iterator begin() const {
+ constexpr const_iterator begin() const {
return rawData();
}
- const_iterator end() const {
+ constexpr const_iterator end() const {
return rawData() + size();
}
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 4b257571860..ee44eb2d14f 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -102,8 +102,12 @@ public:
explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {}
InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {}
+ InsertStatement(StmtId statementId, BSONObj toInsert, SnapshotName ts)
+ : stmtId(statementId), timestamp(ts), doc(toInsert) {}
+ InsertStatement(BSONObj toInsert, SnapshotName ts) : timestamp(ts), doc(toInsert) {}
StmtId stmtId = kUninitializedStmtId;
+ SnapshotName timestamp = SnapshotName();
BSONObj doc;
};
@@ -265,6 +269,7 @@ public:
virtual Status insertDocumentsForOplog(OperationContext* opCtx,
const DocWriter* const* docs,
+ Timestamp* timestamps,
size_t nDocs) = 0;
virtual Status insertDocument(OperationContext* opCtx,
@@ -518,8 +523,9 @@ public:
*/
inline Status insertDocumentsForOplog(OperationContext* const opCtx,
const DocWriter* const* const docs,
+ Timestamp* timestamps,
const size_t nDocs) {
- return this->_impl().insertDocumentsForOplog(opCtx, docs, nDocs);
+ return this->_impl().insertDocumentsForOplog(opCtx, docs, timestamps, nDocs);
}
/**
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 7333330c4cc..58b04a89122 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -321,6 +321,7 @@ StatusWithMatchExpression CollectionImpl::parseValidator(const BSONObj& validato
Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,
const DocWriter* const* docs,
+ Timestamp* timestamps,
size_t nDocs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
@@ -331,7 +332,7 @@ Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,
invariant(!_indexCatalog.haveAnyIndexes());
invariant(!_mustTakeCappedLockOnInsert);
- Status status = _recordStore->insertRecordsWithDocWriter(opCtx, docs, nDocs);
+ Status status = _recordStore->insertRecordsWithDocWriterT(opCtx, docs, timestamps, nDocs);
if (!status.isOK())
return status;
@@ -433,7 +434,8 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx,
if (_mustTakeCappedLockOnInsert)
synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns);
-
+ // TODO SERVER-30638: using timestamp 0 for these inserts, which are non-oplog so we don't yet
+ // care about their correct timestamps.
StatusWith<RecordId> loc = _recordStore->insertRecord(
opCtx, doc.objdata(), doc.objsize(), _enforceQuota(enforceQuota));
@@ -485,11 +487,17 @@ Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
std::vector<Record> records;
records.reserve(count);
+ std::vector<Timestamp> timestamps;
+ timestamps.reserve(count);
+
for (auto it = begin; it != end; it++) {
Record record = {RecordId(), RecordData(it->doc.objdata(), it->doc.objsize())};
records.push_back(record);
+ Timestamp timestamp = Timestamp(it->timestamp.asU64());
+ timestamps.push_back(timestamp);
}
- Status status = _recordStore->insertRecords(opCtx, &records, _enforceQuota(enforceQuota));
+ Status status =
+ _recordStore->insertRecordsT(opCtx, &records, &timestamps, _enforceQuota(enforceQuota));
if (!status.isOK())
return status;
@@ -710,8 +718,9 @@ StatusWith<RecordId> CollectionImpl::_updateDocumentWithMove(OperationContext* o
OplogUpdateEntryArgs* args,
const SnapshotId& sid) {
// Insert new record.
- StatusWith<RecordId> newLocation = _recordStore->insertRecord(
- opCtx, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota));
+ // TODO SERVER-30638, thread through actual timestamps.
+ StatusWith<RecordId> newLocation = _recordStore->insertRecordT(
+ opCtx, newDoc.objdata(), newDoc.objsize(), Timestamp(), _enforceQuota(enforceQuota));
if (!newLocation.isOK()) {
return newLocation;
}
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index 9e5ca69270c..89262f9f7cc 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -190,6 +190,7 @@ public:
*/
Status insertDocumentsForOplog(OperationContext* opCtx,
const DocWriter* const* docs,
+ Timestamp* timestamps,
size_t nDocs) final;
/**
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index ad836035f1b..a4bb7143e31 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -157,6 +157,7 @@ public:
Status insertDocumentsForOplog(OperationContext* opCtx,
const DocWriter* const* docs,
+ Timestamp* timestamps,
size_t nDocs) {
std::abort();
}
diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp
index 6647f2b7099..fedeed17dc1 100644
--- a/src/mongo/db/commands/feature_compatibility_version.cpp
+++ b/src/mongo/db/commands/feature_compatibility_version.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_engine.h"
@@ -227,9 +228,11 @@ void FeatureCompatibilityVersion::setIfCleanStartup(OperationContext* opCtx,
uassertStatusOK(storageInterface->insertDocument(
opCtx,
nss,
- BSON("_id" << FeatureCompatibilityVersion::kParameterName
- << FeatureCompatibilityVersion::kVersionField
- << FeatureCompatibilityVersionCommandParser::kVersion36)));
+ repl::TimestampedBSONObj{
+ BSON("_id" << FeatureCompatibilityVersion::kParameterName
+ << FeatureCompatibilityVersion::kVersionField
+ << FeatureCompatibilityVersionCommandParser::kVersion36),
+ SnapshotName()})); // No timestamp because this write is not replicated.
}
}
diff --git a/src/mongo/db/commands/snapshot_management.cpp b/src/mongo/db/commands/snapshot_management.cpp
index 8fd1742a792..4f797cf0700 100644
--- a/src/mongo/db/commands/snapshot_management.cpp
+++ b/src/mongo/db/commands/snapshot_management.cpp
@@ -124,7 +124,7 @@ public:
Lock::GlobalLock lk(opCtx, MODE_IX, UINT_MAX);
auto name = SnapshotName(cmdObj.firstElement().Long());
- snapshotManager->setCommittedSnapshot(name);
+ snapshotManager->setCommittedSnapshot(name, Timestamp(name.asU64()));
return true;
}
};
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 58664a44907..85a5d7e5795 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -146,11 +146,11 @@ protected:
_storageInterfaceWorkDone.createOplogCalled = true;
return Status::OK();
};
- _storageInterface.insertDocumentFn =
- [this](OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) {
- ++_storageInterfaceWorkDone.documentsInsertedCount;
- return Status::OK();
- };
+ _storageInterface.insertDocumentFn = [this](
+ OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
+ ++_storageInterfaceWorkDone.documentsInsertedCount;
+ return Status::OK();
+ };
_storageInterface.insertDocumentsFn = [this](OperationContext* opCtx,
const NamespaceString& nss,
const std::vector<InsertStatement>& ops) {
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 94997d45293..0cf95dfe417 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -801,6 +801,7 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS
void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ Timestamp oplogSeedDocTimestamp;
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
@@ -817,7 +818,8 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
return;
}
auto&& optimeWithHash = optimeWithHashStatus.getValue();
- _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp();
+ oplogSeedDocTimestamp = _initialSyncState->stopTimestamp =
+ optimeWithHash.opTime.getTimestamp();
if (_initialSyncState->beginTimestamp == _initialSyncState->stopTimestamp) {
_lastApplied = optimeWithHash;
@@ -835,7 +837,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
{
const auto& documents = result.getValue().documents;
invariant(!documents.empty());
- const auto& oplogSeedDoc = documents.front();
+ const BSONObj oplogSeedDoc = documents.front();
LOG(2) << "Inserting oplog seed document: " << oplogSeedDoc;
auto opCtx = makeOpCtx();
@@ -843,7 +845,10 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
// override its behavior in tests. See InitialSyncerReturnsCallbackCanceledAndDoesNot-
// ScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument in
// initial_syncer_test.cpp
- auto status = _storage->insertDocument(opCtx.get(), _opts.localOplogNS, oplogSeedDoc);
+ auto status = _storage->insertDocument(
+ opCtx.get(),
+ _opts.localOplogNS,
+ TimestampedBSONObj{oplogSeedDoc, SnapshotName(oplogSeedDocTimestamp)});
if (!status.isOK()) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 29acdf41793..274678a10bf 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -235,12 +235,12 @@ protected:
_storageInterfaceWorkDone.createOplogCalled = true;
return Status::OK();
};
- _storageInterface->insertDocumentFn =
- [this](OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) {
- LockGuard lock(_storageInterfaceWorkDoneMutex);
- ++_storageInterfaceWorkDone.documentsInsertedCount;
- return Status::OK();
- };
+ _storageInterface->insertDocumentFn = [this](
+ OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
+ LockGuard lock(_storageInterfaceWorkDoneMutex);
+ ++_storageInterfaceWorkDone.documentsInsertedCount;
+ return Status::OK();
+ };
_storageInterface->insertDocumentsFn = [this](OperationContext* opCtx,
const NamespaceString& nss,
const std::vector<InsertStatement>& ops) {
@@ -2046,9 +2046,9 @@ TEST_F(
auto opCtx = makeOpCtx();
NamespaceString insertDocumentNss;
- BSONObj insertDocumentDoc;
+ TimestampedBSONObj insertDocumentDoc;
_storageInterface->insertDocumentFn = [&insertDocumentDoc, &insertDocumentNss](
- OperationContext*, const NamespaceString& nss, const BSONObj& doc) {
+ OperationContext*, const NamespaceString& nss, const TimestampedBSONObj& doc) {
insertDocumentNss = nss;
insertDocumentDoc = doc;
return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
@@ -2095,7 +2095,7 @@ TEST_F(
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::OperationFailed, _lastApplied);
ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
- ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc);
+ ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
}
TEST_F(
@@ -2105,9 +2105,9 @@ TEST_F(
auto opCtx = makeOpCtx();
NamespaceString insertDocumentNss;
- BSONObj insertDocumentDoc;
+ TimestampedBSONObj insertDocumentDoc;
_storageInterface->insertDocumentFn = [initialSyncer, &insertDocumentDoc, &insertDocumentNss](
- OperationContext*, const NamespaceString& nss, const BSONObj& doc) {
+ OperationContext*, const NamespaceString& nss, const TimestampedBSONObj& doc) {
insertDocumentNss = nss;
insertDocumentDoc = doc;
initialSyncer->shutdown().transitional_ignore();
@@ -2155,7 +2155,7 @@ TEST_F(
initialSyncer->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _lastApplied);
ASSERT_EQUALS(_options.localOplogNS, insertDocumentNss);
- ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc);
+ ASSERT_BSONOBJ_EQ(oplogEntry, insertDocumentDoc.obj);
}
TEST_F(
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 3aead459d1d..428a064b437 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -383,17 +383,23 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
"u" update
"d" delete
"c" db cmd
- "db" declares presence of a database (ns is set to the db name + '.')
+ "db" declares presence of a database (ns is set to the db name + '.') (master/slave only)
"n" no op
-
- bb param:
- if not null, specifies a boolean to pass along to the other side as b: param.
- used for "justOne" or "upsert" flags on 'd', 'u'
*/
+
+
+/*
+ * writers - an array with size nDocs of DocWriter objects.
+ * timestamps - an array with size nDocs of respective Timestamp objects for each DocWriter.
+ * oplogCollection - collection to be written to.
+ * replicationMode - ReplSet or MasterSlave.
+ * finalOpTime - the OpTime of the last DocWriter object.
+ */
void _logOpsInner(OperationContext* opCtx,
const NamespaceString& nss,
const DocWriter* const* writers,
- size_t nWriters,
+ Timestamp* timestamps,
+ size_t nDocs,
Collection* oplogCollection,
ReplicationCoordinator::Mode replicationMode,
OpTime finalOpTime) {
@@ -407,7 +413,7 @@ void _logOpsInner(OperationContext* opCtx,
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
- checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, nWriters));
+ checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
@@ -436,6 +442,7 @@ OpTime logOp(OperationContext* opCtx,
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
auto replMode = replCoord->getReplicationMode();
OplogSlot slot;
+ WriteUnitOfWork wuow(opCtx);
getNextOpTime(opCtx, oplog, replCoord, replMode, 1, &slot);
Timestamp prevTs;
@@ -457,7 +464,9 @@ OpTime logOp(OperationContext* opCtx,
prevTs,
preAndPostTs);
const DocWriter* basePtr = &writer;
- _logOpsInner(opCtx, nss, &basePtr, 1, oplog, replMode, slot.opTime);
+ auto timestamp = slot.opTime.getTimestamp();
+ _logOpsInner(opCtx, nss, &basePtr, &timestamp, 1, oplog, replMode, slot.opTime);
+ wuow.commit();
return slot.opTime;
}
@@ -483,6 +492,8 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
Lock::CollectionLock lock(opCtx->lockState(), _oplogCollectionName, MODE_IX);
std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]);
auto replMode = replCoord->getReplicationMode();
+
+ WriteUnitOfWork wuow(opCtx);
getNextOpTime(opCtx, oplog, replCoord, replMode, count, slots.get());
auto wallTime = Date_t::now();
@@ -491,6 +502,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
prevTs = OperationContextSession::get(opCtx)->getLastWriteOpTimeTs();
}
+ auto timestamps = stdx::make_unique<Timestamp[]>(count);
for (size_t i = 0; i < count; i++) {
auto insertStatement = begin[i];
writers.emplace_back(_logOpWriter(opCtx,
@@ -507,14 +519,22 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
prevTs,
{}));
prevTs = slots[i].opTime.getTimestamp();
+ timestamps[i] = slots[i].opTime.getTimestamp();
}
std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const*[count]);
for (size_t i = 0; i < count; i++) {
basePtrs[i] = &writers[i];
}
- _logOpsInner(opCtx, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime);
-
+ _logOpsInner(opCtx,
+ nss,
+ basePtrs.get(),
+ timestamps.get(),
+ count,
+ oplog,
+ replMode,
+ slots[count - 1].opTime);
+ wuow.commit();
return slots[count - 1].opTime;
}
@@ -909,20 +929,25 @@ Status applyOperation_inlock(OperationContext* opCtx,
OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
- std::array<StringData, 6> names = {"o", "ui", "ns", "op", "b", "o2"};
- std::array<BSONElement, 6> fields;
+ std::array<StringData, 7> names = {"ts", "o", "ui", "ns", "op", "b", "o2"};
+ std::array<BSONElement, 7> fields;
op.getFields(names, &fields);
- BSONElement& fieldO = fields[0];
- BSONElement& fieldUI = fields[1];
- BSONElement& fieldNs = fields[2];
- BSONElement& fieldOp = fields[3];
- BSONElement& fieldB = fields[4];
- BSONElement& fieldO2 = fields[5];
+ BSONElement& fieldTs = fields[0];
+ BSONElement& fieldO = fields[1];
+ BSONElement& fieldUI = fields[2];
+ BSONElement& fieldNs = fields[3];
+ BSONElement& fieldOp = fields[4];
+ BSONElement& fieldB = fields[5];
+ BSONElement& fieldO2 = fields[6];
BSONObj o;
if (fieldO.isABSONObj())
o = fieldO.embeddedObject();
+ SnapshotName timestamp;
+ if (fieldTs.ok()) {
+ timestamp = SnapshotName(fieldTs.timestamp());
+ }
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
@@ -992,7 +1017,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
for (auto elem : fieldO.Obj()) {
// Note: we don't care about statement ids here since the secondaries don't create
// their own oplog entries.
- insertObjs.emplace_back(elem.Obj());
+ insertObjs.emplace_back(elem.Obj(), timestamp);
}
uassert(ErrorCodes::OperationFailed,
str::stream() << "Failed to apply insert due to empty array element: "
@@ -1036,8 +1061,8 @@ Status applyOperation_inlock(OperationContext* opCtx,
if (!needToDoUpsert) {
WriteUnitOfWork wuow(opCtx);
OpDebug* const nullOpDebug = nullptr;
- auto status =
- collection->insertDocument(opCtx, InsertStatement(o), nullOpDebug, true);
+ auto status = collection->insertDocument(
+ opCtx, InsertStatement(o, timestamp), nullOpDebug, true);
if (status.isOK()) {
wuow.commit();
} else if (status == ErrorCodes::DuplicateKey) {
@@ -1286,7 +1311,6 @@ Status applyCommand_inlock(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
getGlobalAuthorizationManager()->logOp(opCtx, opType, nss, o, nullptr);
wuow.commit();
-
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 86d917b875b..2e6c24324a0 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -270,7 +270,10 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec
const std::vector<BSONObj> oplog = {makeOplogEntry(1)};
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
+ SnapshotName(0)}));
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
OplogBufferCollection::Options opts;
@@ -362,7 +365,8 @@ DEATH_TEST_F(OplogBufferCollectionTest,
CollectionOptions collOpts;
collOpts.setNoIdIndex();
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, collOpts));
- ASSERT_OK(_storageInterface->insertDocument(_opCtx.get(), nss, makeOplogEntry(1)));
+ ASSERT_OK(_storageInterface->insertDocument(
+ _opCtx.get(), nss, TimestampedBSONObj{makeOplogEntry(1), SnapshotName(0)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -375,7 +379,8 @@ DEATH_TEST_F(OplogBufferCollectionTest,
"Fatal assertion 40405 NoSuchKey: Missing expected field \"ts\"") {
auto nss = makeNamespace(_agent);
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
- ASSERT_OK(_storageInterface->insertDocument(_opCtx.get(), nss, BSON("_id" << 1)));
+ ASSERT_OK(_storageInterface->insertDocument(
+ _opCtx.get(), nss, TimestampedBSONObj{BSON("_id" << 1), SnapshotName(0)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -389,7 +394,9 @@ DEATH_TEST_F(OplogBufferCollectionTest,
auto nss = makeNamespace(_agent);
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, BSON("_id" << BSON("ts" << Timestamp(1, 1)))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1))), SnapshotName(1)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -401,7 +408,10 @@ TEST_F(OplogBufferCollectionTest, PeekWithExistingCollectionReturnsEmptyObjectWh
auto nss = makeNamespace(_agent);
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 0))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 0)),
+ SnapshotName(1)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -419,7 +429,10 @@ TEST_F(OplogBufferCollectionTest,
const std::vector<BSONObj> oplog = {makeOplogEntry(1)};
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
+ SnapshotName(0)}));
OplogBufferCollection::Options opts;
OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
@@ -519,7 +532,10 @@ TEST_F(OplogBufferCollectionTest, PeekingFromExistingCollectionReturnsDocument)
const std::vector<BSONObj> oplog = {makeOplogEntry(1), makeOplogEntry(2)};
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
+ SnapshotName(0)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -659,10 +675,16 @@ TEST_F(OplogBufferCollectionTest,
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
auto firstDoc = makeOplogEntry(1);
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc, {}, 0))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc, {}, 0)),
+ SnapshotName(0)}));
auto secondDoc = makeOplogEntry(2);
ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc, {}, 0))));
+ _opCtx.get(),
+ nss,
+ TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc, {}, 0)),
+ SnapshotName(0)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -779,7 +801,9 @@ TEST_F(OplogBufferCollectionTest, WaitForDataReturnsImmediatelyWhenStartedWithEx
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1), {}, 0))));
+ TimestampedBSONObj{
+ std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1), {}, 0)),
+ SnapshotName(0)}));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
index 03885151087..34e2f2cae72 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -305,12 +305,13 @@ TEST_F(ReplicationConsistencyMarkersTest, OplogTruncateAfterPointUpgrade) {
ASSERT_OK(getStorageInterface()->insertDocument(
opCtx,
minValidNss,
- BSON("_id" << OID::gen() << MinValidDocument::kMinValidTimestampFieldName
- << minValidTime.getTimestamp()
- << MinValidDocument::kMinValidTermFieldName
- << minValidTime.getTerm()
- << MinValidDocument::kOldOplogDeleteFromPointFieldName
- << time1)));
+ TimestampedBSONObj{BSON("_id" << OID::gen() << MinValidDocument::kMinValidTimestampFieldName
+ << minValidTime.getTimestamp()
+ << MinValidDocument::kMinValidTermFieldName
+ << minValidTime.getTerm()
+ << MinValidDocument::kOldOplogDeleteFromPointFieldName
+ << time1),
+ SnapshotName(0)}));
consistencyMarkers.initializeMinValidDocument(opCtx);
// Set the feature compatibility version to 3.6.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.cpp b/src/mongo/db/repl/replication_coordinator_external_state.cpp
index fbeddfba68a..637257adcba 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state.cpp
@@ -33,6 +33,13 @@
namespace mongo {
namespace repl {
+std::string SnapshotInfo::toString() const {
+ BSONObjBuilder bob;
+ bob.append("optime", opTime.toBSON());
+ bob.append("name-id", name.toString());
+ return bob.obj().toString();
+}
+
ReplicationCoordinatorExternalState::ReplicationCoordinatorExternalState() {}
ReplicationCoordinatorExternalState::~ReplicationCoordinatorExternalState() {}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 617c92cb7fa..fa521055ae7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -59,6 +59,31 @@ class LastVote;
class ReplSettings;
class ReplicationCoordinator;
+struct SnapshotInfo {
+ OpTime opTime;
+ SnapshotName name;
+
+ bool operator==(const SnapshotInfo& other) const {
+ return std::tie(opTime, name) == std::tie(other.opTime, other.name);
+ }
+ bool operator!=(const SnapshotInfo& other) const {
+ return std::tie(opTime, name) != std::tie(other.opTime, other.name);
+ }
+ bool operator<(const SnapshotInfo& other) const {
+ return std::tie(opTime, name) < std::tie(other.opTime, other.name);
+ }
+ bool operator<=(const SnapshotInfo& other) const {
+ return std::tie(opTime, name) <= std::tie(other.opTime, other.name);
+ }
+ bool operator>(const SnapshotInfo& other) const {
+ return std::tie(opTime, name) > std::tie(other.opTime, other.name);
+ }
+ bool operator>=(const SnapshotInfo& other) const {
+ return std::tie(opTime, name) >= std::tie(other.opTime, other.name);
+ }
+ std::string toString() const;
+};
+
/**
* This class represents the interface the ReplicationCoordinator uses to interact with the
* rest of the system. All functionality of the ReplicationCoordinatorImpl that would introduce
@@ -254,7 +279,7 @@ public:
*
* It is illegal to call with a newCommitPoint that does not name an existing snapshot.
*/
- virtual void updateCommittedSnapshot(SnapshotName newCommitPoint) = 0;
+ virtual void updateCommittedSnapshot(SnapshotInfo newCommitPoint) = 0;
/**
* Creates a new snapshot.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index f396126c7de..15246d05503 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
@@ -379,18 +380,27 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
const BSONObj& config) {
try {
createOplog(opCtx);
-
- writeConflictRetry(
- opCtx, "initiate oplog entry", "local.oplog.rs", [this, &opCtx, &config] {
- Lock::GlobalWrite globalWrite(opCtx);
-
- WriteUnitOfWork wuow(opCtx);
- Helpers::putSingleton(opCtx, configCollectionName, config);
- const auto msgObj = BSON("msg"
- << "initiating set");
- _service->getOpObserver()->onOpMessage(opCtx, msgObj);
- wuow.commit();
- });
+ const auto& kRsOplogNamespace = NamespaceString::kRsOplogNamespace;
+
+ writeConflictRetry(opCtx,
+ "initiate oplog entry",
+ kRsOplogNamespace.toString(),
+ [this, &opCtx, &config, &kRsOplogNamespace] {
+ Lock::GlobalWrite globalWrite(opCtx);
+
+ WriteUnitOfWork wuow(opCtx);
+ Helpers::putSingleton(opCtx, configCollectionName, config);
+ const auto msgObj = BSON("msg"
+ << "initiating set");
+ _service->getOpObserver()->onOpMessage(opCtx, msgObj);
+ wuow.commit();
+ // ReplSetTest assumes that immediately after the replSetInitiate
+ // command returns, it can allow other nodes to initial sync with no
+ // retries and they will succeed. Unfortunately, initial sync will
+ // fail if it finds its sync source has an empty oplog. Thus, we
+ // need to wait here until the seed document is visible in our oplog.
+ waitForAllEarlierOplogWritesToBeVisible(opCtx);
+ });
FeatureCompatibilityVersion::setIfCleanStartup(opCtx, _storageInterface);
} catch (const DBException& ex) {
@@ -778,10 +788,10 @@ void ReplicationCoordinatorExternalStateImpl::dropAllSnapshots() {
manager->dropAllSnapshots();
}
-void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotName newCommitPoint) {
+void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotInfo newCommitPoint) {
auto manager = _service->getGlobalStorageEngine()->getSnapshotManager();
invariant(manager); // This should never be called if there is no SnapshotManager.
- manager->setCommittedSnapshot(newCommitPoint);
+ manager->setCommittedSnapshot(newCommitPoint.name, newCommitPoint.opTime.getTimestamp());
}
void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 0893bdc16bd..7e888f24ca0 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -100,7 +100,7 @@ public:
virtual void stopProducer();
virtual void startProducerIfStopped();
void dropAllSnapshots() final;
- void updateCommittedSnapshot(SnapshotName newCommitPoint) final;
+ void updateCommittedSnapshot(SnapshotInfo newCommitPoint) final;
void createSnapshot(OperationContext* opCtx, SnapshotName name) final;
void forceSnapshotCreation() final;
virtual bool snapshotsEnabled() const;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 1f544138fcc..70bdff0601f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -211,7 +211,7 @@ void ReplicationCoordinatorExternalStateMock::startProducerIfStopped() {}
void ReplicationCoordinatorExternalStateMock::dropAllSnapshots() {}
-void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(SnapshotName newCommitPoint) {
+void ReplicationCoordinatorExternalStateMock::updateCommittedSnapshot(SnapshotInfo newCommitPoint) {
}
void ReplicationCoordinatorExternalStateMock::createSnapshot(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index c18c8c8fc16..8478f9a05f5 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -87,7 +87,7 @@ public:
virtual void stopProducer();
virtual void startProducerIfStopped();
virtual void dropAllSnapshots();
- virtual void updateCommittedSnapshot(SnapshotName newCommitPoint);
+ virtual void updateCommittedSnapshot(SnapshotInfo newCommitPoint);
virtual void createSnapshot(OperationContext* opCtx, SnapshotName name);
virtual void forceSnapshotCreation();
virtual bool snapshotsEnabled() const;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 35bb765f8f5..fdcddefa53d 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -274,13 +274,6 @@ InitialSyncerOptions createInitialSyncerOptions(
}
} // namespace
-std::string ReplicationCoordinatorImpl::SnapshotInfo::toString() const {
- BSONObjBuilder bob;
- bob.append("optime", opTime.toBSON());
- bob.append("name-id", name.toString());
- return bob.obj().toString();
-}
-
ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
ServiceContext* service,
const ReplSettings& settings,
@@ -3359,7 +3352,7 @@ void ReplicationCoordinatorImpl::_updateCommittedSnapshot_inlock(
_currentCommittedSnapshot = newCommittedSnapshot;
_currentCommittedSnapshotCond.notify_all();
- _externalState->updateCommittedSnapshot(newCommittedSnapshot.name);
+ _externalState->updateCommittedSnapshot(newCommittedSnapshot);
// Wake up any threads waiting for read concern or write concern.
_wakeReadyWaiters_inlock();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 55a5aa80d2e..2e9349f9d73 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -419,31 +419,6 @@ private:
using ScheduleFn = stdx::function<StatusWith<executor::TaskExecutor::CallbackHandle>(
const executor::TaskExecutor::CallbackFn& work)>;
- struct SnapshotInfo {
- OpTime opTime;
- SnapshotName name;
-
- bool operator==(const SnapshotInfo& other) const {
- return std::tie(opTime, name) == std::tie(other.opTime, other.name);
- }
- bool operator!=(const SnapshotInfo& other) const {
- return std::tie(opTime, name) != std::tie(other.opTime, other.name);
- }
- bool operator<(const SnapshotInfo& other) const {
- return std::tie(opTime, name) < std::tie(other.opTime, other.name);
- }
- bool operator<=(const SnapshotInfo& other) const {
- return std::tie(opTime, name) <= std::tie(other.opTime, other.name);
- }
- bool operator>(const SnapshotInfo& other) const {
- return std::tie(opTime, name) > std::tie(other.opTime, other.name);
- }
- bool operator>=(const SnapshotInfo& other) const {
- return std::tie(opTime, name) >= std::tie(other.opTime, other.name);
- }
- std::string toString() const;
- };
-
class LoseElectionGuardV1;
class LoseElectionDryRunGuardV1;
diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp
index 40a51312433..3950e27418b 100644
--- a/src/mongo/db/repl/replication_process_test.cpp
+++ b/src/mongo/db/repl/replication_process_test.cpp
@@ -111,16 +111,18 @@ TEST_F(ReplicationProcessTest,
// Collection is not empty but does not contain document with _id "rollbackProgress".
ASSERT_OK(_storageInterface->insertDocument(opCtx.get(),
ReplicationProcess::kRollbackProgressNamespace,
- BSON("_id"
- << "not progress")));
+ TimestampedBSONObj{BSON("_id"
+ << "not progress"),
+ SnapshotName(0)}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey, replicationProcess.getRollbackProgress(opCtx.get()));
}
TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFieldIsNotAnOpTime) {
- auto doc = BSON("_id"
- << "rollbackProgress"
- << "applyUntil"
- << "not op time!");
+ auto doc = TimestampedBSONObj{BSON("_id"
+ << "rollbackProgress"
+ << "applyUntil"
+ << "not op time!"),
+ SnapshotName(0)};
auto opCtx = makeOpCtx();
ASSERT_OK(_storageInterface->createCollection(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, {}));
@@ -136,13 +138,14 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi
TEST_F(ReplicationProcessTest,
GetRollbackProgressReturnsTypeMismatchIfApplyUntilFieldContainsBadTimestampValue) {
- auto doc = BSON("_id"
- << "rollbackProgress"
- << "applyUntil"
- << BSON("ts"
- << "not_timestamp"
- << "t"
- << 1LL));
+ auto doc = TimestampedBSONObj{BSON("_id"
+ << "rollbackProgress"
+ << "applyUntil"
+ << BSON("ts"
+ << "not_timestamp"
+ << "t"
+ << 1LL)),
+ SnapshotName(0)};
auto opCtx = makeOpCtx();
ASSERT_OK(_storageInterface->createCollection(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, {}));
@@ -159,10 +162,11 @@ TEST_F(ReplicationProcessTest,
TEST_F(ReplicationProcessTest,
GetRollbackProgressReturnsApplyUntilOpTimeIfDocumentExistsWithIdProgress) {
OpTime applyUntil({Seconds(123), 0}, 1LL);
- auto doc = BSON("_id"
- << "rollbackProgress"
- << "applyUntil"
- << applyUntil);
+ auto doc = TimestampedBSONObj{BSON("_id"
+ << "rollbackProgress"
+ << "applyUntil"
+ << applyUntil),
+ SnapshotName(0)};
auto opCtx = makeOpCtx();
ASSERT_OK(_storageInterface->createCollection(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, {}));
diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp
index 5a63cd5cc4a..7d91431d949 100644
--- a/src/mongo/db/repl/replication_recovery_test.cpp
+++ b/src/mongo/db/repl/replication_recovery_test.cpp
@@ -137,11 +137,12 @@ BSONObj _makeInsertDocument(int t) {
/**
* Generates oplog entries with the given number used for the timestamp.
*/
-BSONObj _makeOplogEntry(int t) {
- return BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op"
- << "i"
- << "o"
- << _makeInsertDocument(t));
+TimestampedBSONObj _makeOplogEntry(int t) {
+ return {BSON("ts" << Timestamp(t, t) << "h" << t << "ns" << testNs.ns() << "v" << 2 << "op"
+ << "i"
+ << "o"
+ << _makeInsertDocument(t)),
+ SnapshotName(t)};
}
/**
@@ -188,7 +189,7 @@ void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) {
std::vector<BSONObj> expectedOplog(timestamps.size());
std::transform(timestamps.begin(), timestamps.end(), expectedOplog.begin(), [](int ts) {
- return _makeOplogEntry(ts);
+ return _makeOplogEntry(ts).obj;
});
_assertDocumentsInCollectionEquals(opCtx, oplogNs, expectedOplog);
}
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 764bdcadad3..27507e790e8 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -42,6 +42,7 @@
#include "mongo/db/repl/collection_bulk_loader.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/snapshot_name.h"
namespace mongo {
@@ -51,6 +52,11 @@ class OperationContext;
namespace repl {
+struct TimestampedBSONObj {
+ BSONObj obj;
+ SnapshotName timestamp;
+};
+
/**
* Storage interface used by the replication system to interact with storage.
* This interface provides seperation of concerns and a place for mocking out test
@@ -98,17 +104,18 @@ public:
const std::vector<BSONObj>& secondaryIndexSpecs) = 0;
/**
- * Inserts a document into a collection.
+ * Inserts a document with a timestamp into a collection.
*
* NOTE: If the collection doesn't exist, it will not be created, and instead
* an error is returned.
*/
virtual Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& doc) = 0;
+ const TimestampedBSONObj& doc) = 0;
/**
- * Inserts the given documents into the collection.
+ * Inserts the given documents, with associated timestamps and statement id's, into the
+ * collection.
* It is an error to call this function with an empty set of documents.
*/
virtual Status insertDocuments(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index a2dca57abdd..35e67cbbe9f 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -126,7 +126,8 @@ Status StorageInterfaceImpl::initializeRollbackID(OperationContext* opCtx) {
BSONObjBuilder bob;
rbid.serialize(&bob);
- return insertDocument(opCtx, _rollbackIdNss, bob.done());
+ SnapshotName noTimestamp; // This write is not replicated.
+ return insertDocument(opCtx, _rollbackIdNss, TimestampedBSONObj{bob.done(), noTimestamp});
}
Status StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) {
@@ -263,8 +264,8 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& doc) {
- return insertDocuments(opCtx, nss, {InsertStatement(doc)});
+ const TimestampedBSONObj& doc) {
+ return insertDocuments(opCtx, nss, {InsertStatement(doc.obj, doc.timestamp)});
}
namespace {
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 1b48fe6c87d..572cf4eb992 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -66,7 +66,7 @@ public:
Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& doc) override;
+ const TimestampedBSONObj& doc) override;
Status insertDocuments(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 5e6ba445814..f86d56f1a25 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -116,12 +116,12 @@ void createCollection(OperationContext* opCtx,
/**
* Creates an oplog entry with given optime.
*/
-BSONObj makeOplogEntry(OpTime opTime) {
+TimestampedBSONObj makeOplogEntry(OpTime opTime) {
BSONObjBuilder bob(opTime.toBSON());
bob.append("h", 1LL);
bob.append("op", "c");
bob.append("ns", "test.t");
- return bob.obj();
+ return {bob.obj(), SnapshotName(opTime.getTimestamp())};
}
/**
@@ -142,6 +142,13 @@ std::vector<InsertStatement> transformInserts(std::vector<BSONObj> docs) {
});
return inserts;
}
+std::vector<InsertStatement> transformInserts(std::vector<TimestampedBSONObj> docs) {
+ std::vector<InsertStatement> inserts(docs.size());
+ std::transform(docs.cbegin(), docs.cend(), inserts.begin(), [](const TimestampedBSONObj& doc) {
+ return InsertStatement(doc.obj, doc.timestamp);
+ });
+ return inserts;
+}
class StorageInterfaceImplTest : public ServiceContextMongoDTest {
protected:
@@ -244,6 +251,19 @@ void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
}
+void _assertDocumentsInCollectionEquals(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<TimestampedBSONObj>& docs) {
+ std::vector<TimestampedBSONObj> reversedDocs(docs);
+ std::reverse(reversedDocs.begin(), reversedDocs.end());
+ OplogInterfaceLocal oplog(opCtx, nss.ns());
+ auto iter = oplog.makeIterator();
+ for (const auto& doc : reversedDocs) {
+ ASSERT_BSONOBJ_EQ(doc.obj, unittest::assertGet(iter->next()).first);
+ }
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
+}
+
/**
* Check collection contents for a singleton Rollback ID document.
*/
@@ -284,10 +304,11 @@ TEST_F(StorageInterfaceImplTest, IncrementRollbackIDRollsToZeroWhenExceedingMaxI
auto opCtx = getOperationContext();
NamespaceString nss(StorageInterfaceImpl::kDefaultRollbackIdNamespace);
createCollection(opCtx, nss);
- auto maxDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
- << StorageInterfaceImpl::kRollbackIdFieldName
- << std::numeric_limits<int>::max())};
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(maxDoc)));
+ TimestampedBSONObj maxDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
+ << StorageInterfaceImpl::kRollbackIdFieldName
+ << std::numeric_limits<int>::max()),
+ SnapshotName(0)};
+ ASSERT_OK(storage.insertDocument(opCtx, nss, maxDoc));
_assertRollbackIDDocument(opCtx, std::numeric_limits<int>::max());
auto rbid = unittest::assertGet(storage.getRollbackID(opCtx));
@@ -313,8 +334,10 @@ TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfDocumentHasBadFi
createCollection(opCtx, nss);
- auto badDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId << "bad field" << 3)};
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(badDoc)));
+ std::vector<TimestampedBSONObj> badDocs = {
+ {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId << "bad field" << 3),
+ SnapshotName(0)}};
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(badDocs)));
ASSERT_EQUALS(ErrorCodes::fromInt(40415), storage.getRollbackID(opCtx).getStatus());
}
@@ -325,9 +348,11 @@ TEST_F(StorageInterfaceImplTest, GetRollbackIDReturnsBadStatusIfRollbackIDIsNotI
createCollection(opCtx, nss);
- auto badDoc = {BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
- << StorageInterfaceImpl::kRollbackIdFieldName
- << "bad id")};
+ std::vector<TimestampedBSONObj> badDoc = {
+ TimestampedBSONObj{BSON("_id" << StorageInterfaceImpl::kRollbackIdDocumentId
+ << StorageInterfaceImpl::kRollbackIdFieldName
+ << "bad id"),
+ SnapshotName(0)}};
ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(badDoc)));
ASSERT_EQUALS(ErrorCodes::TypeMismatch, storage.getRollbackID(opCtx).getStatus());
}
@@ -410,8 +435,8 @@ TEST_F(StorageInterfaceImplTest, InsertDocumentsSavesOperationsReturnsOpTimeOfLa
// Check contents of oplog. OplogInterface iterates over oplog collection in reverse.
repl::OplogInterfaceLocal oplog(opCtx, nss.ns());
auto iter = oplog.makeIterator();
- ASSERT_BSONOBJ_EQ(op2, unittest::assertGet(iter->next()).first);
- ASSERT_BSONOBJ_EQ(op1, unittest::assertGet(iter->next()).first);
+ ASSERT_BSONOBJ_EQ(op2.obj, unittest::assertGet(iter->next()).first);
+ ASSERT_BSONOBJ_EQ(op1.obj, unittest::assertGet(iter->next()).first);
ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
}
@@ -434,7 +459,7 @@ TEST_F(StorageInterfaceImplTest, InsertMissingDocWorksOnExistingCappedCollection
opts.capped = true;
opts.cappedSize = 1024 * 1024;
createCollection(opCtx, nss, opts);
- ASSERT_OK(storage.insertDocument(opCtx, nss, BSON("_id" << 1)));
+ ASSERT_OK(storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}));
AutoGetCollectionForReadCommand autoColl(opCtx, nss);
ASSERT_TRUE(autoColl.getCollection());
}
@@ -444,7 +469,7 @@ TEST_F(StorageInterfaceImplTest, InsertMissingDocWorksOnExistingCollection) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
createCollection(opCtx, nss);
- ASSERT_OK(storage.insertDocument(opCtx, nss, BSON("_id" << 1)));
+ ASSERT_OK(storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}));
AutoGetCollectionForReadCommand autoColl(opCtx, nss);
ASSERT_TRUE(autoColl.getCollection());
}
@@ -453,7 +478,7 @@ TEST_F(StorageInterfaceImplTest, InsertMissingDocFailesIfCollectionIsMissing) {
auto opCtx = getOperationContext();
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
- const auto status = storage.insertDocument(opCtx, nss, BSON("_id" << 1));
+ const auto status = storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)});
ASSERT_NOT_OK(status);
ASSERT_EQ(status.code(), ErrorCodes::NamespaceNotFound);
}
@@ -623,7 +648,7 @@ TEST_F(StorageInterfaceImplTest, DropCollectionWorksWithExistingWithDataCollecti
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
createCollection(opCtx, nss);
- ASSERT_OK(storage.insertDocument(opCtx, nss, BSON("_id" << 1)));
+ ASSERT_OK(storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}));
ASSERT_OK(storage.dropCollection(opCtx, nss));
}
@@ -893,13 +918,14 @@ TEST_F(StorageInterfaceImplTest,
auto nss = makeNamespace(_agent);
auto indexName = "_id_"_sd;
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- transformInserts({BSON("_id" << 0),
- BSON("_id" << 1),
- BSON("_id" << 2),
- BSON("_id" << 3),
- BSON("_id" << 4)})));
+ ASSERT_OK(storage.insertDocuments(
+ opCtx,
+ nss,
+ transformInserts(std::vector<TimestampedBSONObj>{{BSON("_id" << 0), SnapshotName(0)},
+ {BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 3), SnapshotName(3)},
+ {BSON("_id" << 4), SnapshotName(4)}})));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1031,11 +1057,11 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
ASSERT_OK(storage.insertDocuments(opCtx,
nss,
- transformInserts({BSON("_id" << 0),
- BSON("_id" << 1),
- BSON("_id" << 2),
- BSON("_id" << 3),
- BSON("_id" << 4)})));
+ {{BSON("_id" << 0), SnapshotName(0)},
+ {BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 3), SnapshotName(3)},
+ {BSON("_id" << 4), SnapshotName(4)}}));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1144,8 +1170,11 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 1),
_assetGetFront(storage.findDocuments(opCtx,
@@ -1171,8 +1200,11 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 0),
_assetGetFront(storage.findDocuments(opCtx,
@@ -1192,8 +1224,11 @@ TEST_F(StorageInterfaceImplTest, FindDocumentsCollScanReturnsNoSuchKeyIfStartKey
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey,
storage
.findDocuments(opCtx,
@@ -1212,8 +1247,11 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
storage
.findDocuments(opCtx,
@@ -1287,14 +1325,14 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
ASSERT_OK(storage.insertDocuments(opCtx,
nss,
- transformInserts({BSON("_id" << 0),
- BSON("_id" << 1),
- BSON("_id" << 2),
- BSON("_id" << 3),
- BSON("_id" << 4),
- BSON("_id" << 5),
- BSON("_id" << 6),
- BSON("_id" << 7)})));
+ {{BSON("_id" << 0), SnapshotName(0)},
+ {BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 3), SnapshotName(3)},
+ {BSON("_id" << 4), SnapshotName(4)},
+ {BSON("_id" << 5), SnapshotName(5)},
+ {BSON("_id" << 6), SnapshotName(6)},
+ {BSON("_id" << 7), SnapshotName(7)}}));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1397,14 +1435,14 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
ASSERT_OK(storage.insertDocuments(opCtx,
nss,
- transformInserts({BSON("_id" << 0),
- BSON("_id" << 1),
- BSON("_id" << 2),
- BSON("_id" << 3),
- BSON("_id" << 4),
- BSON("_id" << 5),
- BSON("_id" << 6),
- BSON("_id" << 7)})));
+ {{BSON("_id" << 0), SnapshotName(0)},
+ {BSON("_id" << 1), SnapshotName(1)},
+ {BSON("_id" << 2), SnapshotName(2)},
+ {BSON("_id" << 3), SnapshotName(3)},
+ {BSON("_id" << 4), SnapshotName(4)},
+ {BSON("_id" << 5), SnapshotName(5)},
+ {BSON("_id" << 6), SnapshotName(6)},
+ {BSON("_id" << 7), SnapshotName(7)}}));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1504,8 +1542,11 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 1),
_assetGetFront(storage.deleteDocuments(opCtx,
@@ -1525,8 +1566,11 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 0),
_assetGetFront(storage.deleteDocuments(opCtx,
@@ -1545,8 +1589,11 @@ TEST_F(StorageInterfaceImplTest, DeleteDocumentsCollScanReturnsNoSuchKeyIfStartK
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey,
storage
.deleteDocuments(opCtx,
@@ -1565,8 +1612,11 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
storage
.deleteDocuments(opCtx,
@@ -1610,7 +1660,8 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc2})));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::TooManyMatchingDocuments,
storage.findSingleton(opCtx, nss).getStatus());
}
@@ -1621,7 +1672,7 @@ TEST_F(StorageInterfaceImplTest, FindSingletonReturnsDocumentWhenSingletonDocume
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocument(opCtx, nss, doc1));
+ ASSERT_OK(storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}));
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(storage.findSingleton(opCtx, nss)));
}
@@ -1661,7 +1712,7 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesDocumentWhenCollectionIsNotE
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocument(opCtx, nss, doc1));
+ ASSERT_OK(storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}));
auto update = BSON("$set" << BSON("x" << 1));
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1),
@@ -1676,7 +1727,8 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesFirstDocumentWhenCollectionI
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc2})));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}}));
auto update = BSON("$set" << BSON("x" << 2));
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
_assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 2), doc2});
@@ -1708,7 +1760,8 @@ TEST_F(StorageInterfaceImplTest, FindByIdReturnsNoSuchKeyWhenDocumentIsNotFound)
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc3})));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey, storage.findById(opCtx, nss, doc2["_id"]).getStatus());
}
@@ -1720,7 +1773,8 @@ TEST_F(StorageInterfaceImplTest, FindByIdReturnsDocumentWhenDocumentExists) {
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc2, doc3})));
+ ASSERT_OK(storage.insertDocuments(
+ opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
ASSERT_BSONOBJ_EQ(doc2, unittest::assertGet(storage.findById(opCtx, nss, doc2["_id"])));
}
@@ -1740,7 +1794,7 @@ TEST_F(StorageInterfaceImplTest, DeleteByIdReturnsNoSuchKeyWhenCollectionIsEmpty
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc = BSON("_id" << 0 << "x" << 0);
ASSERT_EQUALS(ErrorCodes::NoSuchKey, storage.deleteById(opCtx, nss, doc["_id"]).getStatus());
- _assertDocumentsInCollectionEquals(opCtx, nss, {});
+ _assertDocumentsInCollectionEquals(opCtx, nss, std::vector<BSONObj>{});
}
TEST_F(StorageInterfaceImplTest, DeleteByIdReturnsNoSuchKeyWhenDocumentIsNotFound) {
@@ -1751,7 +1805,8 @@ TEST_F(StorageInterfaceImplTest, DeleteByIdReturnsNoSuchKeyWhenDocumentIsNotFoun
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc3})));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey, storage.deleteById(opCtx, nss, doc2["_id"]).getStatus());
_assertDocumentsInCollectionEquals(opCtx, nss, {doc1, doc3});
}
@@ -1764,7 +1819,8 @@ TEST_F(StorageInterfaceImplTest, DeleteByIdReturnsDocumentWhenDocumentExists) {
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc2, doc3})));
+ ASSERT_OK(storage.insertDocuments(
+ opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
ASSERT_BSONOBJ_EQ(doc2, unittest::assertGet(storage.deleteById(opCtx, nss, doc2["_id"])));
_assertDocumentsInCollectionEquals(opCtx, nss, {doc1, doc3});
}
@@ -1801,11 +1857,11 @@ TEST_F(StorageInterfaceImplTest, UpsertSingleDocumentReplacesExistingDocumentInC
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto originalDoc = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(storage.insertDocuments(
- opCtx,
- nss,
- transformInserts(
- {BSON("_id" << 0 << "x" << 0), originalDoc, BSON("_id" << 2 << "x" << 2)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {originalDoc, SnapshotName(0)},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(2)}}));
ASSERT_OK(storage.upsertById(opCtx, nss, originalDoc["_id"], BSON("x" << 100)));
@@ -1822,10 +1878,10 @@ TEST_F(StorageInterfaceImplTest, UpsertSingleDocumentInsertsNewDocumentInCollect
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx,
- nss,
- transformInserts({BSON("_id" << 0 << "x" << 0), BSON("_id" << 2 << "x" << 2)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(2)}}));
ASSERT_OK(storage.upsertById(opCtx, nss, BSON("" << 1).firstElement(), BSON("x" << 100)));
@@ -1850,11 +1906,11 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto originalDoc = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(storage.insertDocuments(
- opCtx,
- nss,
- transformInserts(
- {BSON("_id" << 0 << "x" << 0), originalDoc, BSON("_id" << 2 << "x" << 2)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {originalDoc, SnapshotName(0)},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(2)}}));
ASSERT_OK(storage.upsertById(opCtx, nss, originalDoc["_id"], BSON("x" << 100)));
@@ -1964,7 +2020,7 @@ TEST_F(
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc})));
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, {{doc, SnapshotName(0)}}));
_assertDocumentsInCollectionEquals(opCtx, nss, {doc});
// This test fixture disables replicated writes by default. We want to re-enable this setting
@@ -1989,7 +2045,7 @@ TEST_F(
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc})));
+ ASSERT_OK(storage.insertDocuments(opCtx, nss, {{doc, SnapshotName(0)}}));
_assertDocumentsInCollectionEquals(opCtx, nss, {doc});
// This test fixture disables replicated writes by default. We want to re-enable this setting
@@ -2030,7 +2086,7 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterReturnsSuccessIfCollectionIsEmpty
ASSERT_OK(storage.deleteByFilter(opCtx, nss, {}));
- _assertDocumentsInCollectionEquals(opCtx, nss, {});
+ _assertDocumentsInCollectionEquals(opCtx, nss, std::vector<BSONObj>{});
}
TEST_F(StorageInterfaceImplTest, DeleteByFilterLeavesCollectionUnchangedIfNoDocumentsMatchFilter) {
@@ -2039,7 +2095,8 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterLeavesCollectionUnchangedIfNoDocu
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto docs = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 2 << "x" << 2)};
+ std::vector<TimestampedBSONObj> docs = {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(0)}};
ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(docs)));
auto filter = BSON("x" << 1);
@@ -2054,10 +2111,10 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterRemoveDocumentsThatMatchFilter) {
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto docs = {BSON("_id" << 0 << "x" << 0),
- BSON("_id" << 1 << "x" << 1),
- BSON("_id" << 2 << "x" << 2),
- BSON("_id" << 3 << "x" << 3)};
+ std::vector<TimestampedBSONObj> docs = {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {BSON("_id" << 1 << "x" << 1), SnapshotName(0)},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(0)},
+ {BSON("_id" << 3 << "x" << 3), SnapshotName(0)}};
ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(docs)));
auto filter = BSON("x" << BSON("$in" << BSON_ARRAY(1 << 2)));
@@ -2073,7 +2130,8 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterUsesIdHackIfFilterContainsIdField
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto docs = {BSON("_id" << 0 << "x" << 0), BSON("_id" << 1 << "x" << 1)};
+ std::vector<TimestampedBSONObj> docs = {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {BSON("_id" << 1 << "x" << 1), SnapshotName(0)}};
ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(docs)));
auto filter = BSON("_id" << 1);
@@ -2093,10 +2151,10 @@ TEST_F(StorageInterfaceImplTest, DeleteByFilterRemovesDocumentsInIllegalClientSy
StorageInterfaceImpl storage;
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto docs = {BSON("_id" << 0 << "x" << 0),
- BSON("_id" << 1 << "x" << 1),
- BSON("_id" << 2 << "x" << 2),
- BSON("_id" << 3 << "x" << 3)};
+ std::vector<TimestampedBSONObj> docs = {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
+ {BSON("_id" << 1 << "x" << 1), SnapshotName(0)},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(0)},
+ {BSON("_id" << 3 << "x" << 3), SnapshotName(0)}};
ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts(docs)));
auto filter = BSON("$or" << BSON_ARRAY(BSON("x" << 0) << BSON("_id" << 2)));
@@ -2128,7 +2186,12 @@ TEST_F(StorageInterfaceImplTest,
<< "DEF");
auto doc4 = BSON("_id" << 4 << "x"
<< "def");
- ASSERT_OK(storage.insertDocuments(opCtx, nss, transformInserts({doc1, doc2, doc3, doc4})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0)},
+ {doc2, SnapshotName(0)},
+ {doc3, SnapshotName(0)},
+ {doc4, SnapshotName(0)}}));
// This filter should remove doc1 and doc2 because the values of the field "x"
// are equivalent to "aBc" under the case-insensive collation.
@@ -2173,8 +2236,11 @@ TEST_F(StorageInterfaceImplTest, GetCollectionCountReturnsCollectionCount) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
auto count = unittest::assertGet(storage.getCollectionCount(opCtx, nss));
ASSERT_EQUALS(3UL, count);
}
@@ -2212,8 +2278,11 @@ TEST_F(StorageInterfaceImplTest, GetCollectionSizeReturnsCollectionSize) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, transformInserts({BSON("_id" << 1), BSON("_id" << 2), BSON("_id" << 0)})));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0)},
+ {BSON("_id" << 2), SnapshotName(0)},
+ {BSON("_id" << 0), SnapshotName(0)}}));
auto size = unittest::assertGet(storage.getCollectionSize(opCtx, nss));
ASSERT_NOT_EQUALS(0UL, size);
}
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index a9b33a8b5c3..7526acfafa8 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -90,7 +90,7 @@ public:
const BSONObj idIndexSpec,
const std::vector<BSONObj>& secondaryIndexSpecs)>;
using InsertDocumentFn = stdx::function<Status(
- OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc)>;
+ OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc)>;
using InsertDocumentsFn = stdx::function<Status(OperationContext* opCtx,
const NamespaceString& nss,
const std::vector<InsertStatement>& docs)>;
@@ -135,7 +135,7 @@ public:
Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& doc) override {
+ const TimestampedBSONObj& doc) override {
return insertDocumentFn(opCtx, nss, doc);
};
@@ -270,7 +270,7 @@ public:
return Status{ErrorCodes::IllegalOperation, "CreateCollectionForBulkFn not implemented."};
};
InsertDocumentFn insertDocumentFn =
- [](OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) {
+ [](OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."};
};
InsertDocumentsFn insertDocumentsFn = [](OperationContext* opCtx,
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 1cefe0b2bf7..83757f2f310 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -477,7 +477,8 @@ void scheduleWritesToOplog(OperationContext* opCtx,
for (size_t i = begin; i < end; i++) {
// Add as unowned BSON to avoid unnecessary ref-count bumps.
// 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed.
- docs.emplace_back(BSONObj(ops[i].raw.objdata()));
+ docs.emplace_back(
+ InsertStatement{ops[i].raw, SnapshotName(ops[i].getOpTime().getTimestamp())});
}
fassertStatusOK(40141,
@@ -1361,6 +1362,12 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
// Update the transaction table to point to the latest oplog entries for each session id.
scheduleTxnTableUpdates(opCtx, workerPool, latestTxnRecords);
+
+ // Notify the storage engine that a replication batch has completed.
+ // This means that all the writes associated with the oplog entries in the batch are
+ // finished and no new writes with timestamps associated with those oplog entries will show
+ // up in the future.
+ getGlobalServiceContext()->getGlobalStorageEngine()->replicationBatchIsComplete();
}
// If any of the statuses is not ok, return error.
diff --git a/src/mongo/db/storage/README.md b/src/mongo/db/storage/README.md
index 8326f47c8ae..37699575ded 100644
--- a/src/mongo/db/storage/README.md
+++ b/src/mongo/db/storage/README.md
@@ -1,8 +1,8 @@
Storage Engine API
==================
-The purpose of the Storage Engine API is to allow for pluggable storage engines in MongoDB, see
-also the [Storage FAQ][]. This document gives a brief overview of the API, and provides pointers
+The purpose of the Storage Engine API is to allow for pluggable storage engines in MongoDB (refer
+to the [Storage FAQ][]). This document gives a brief overview of the API, and provides pointers
to places with more detailed documentation. Where referencing code, links are to the version that
was current at the time when the reference was made. Always compare with the latest version for
changes not yet reflected here. For questions on the API that are not addressed by this material,
@@ -35,15 +35,15 @@ indexes.
A RecordId is a unique identifier, assigned by the storage engine, for a specific document or entry
in a record store at a given time. For storage engines based in the KVEngine the record identity is
fixed, but other storage engines, such as MMAPv1, may change it when updating a document. Note that
-this can be very expensive, as indexes map to the RecordId. A single document with a large array
-may have thousands of index entries, resulting in very expensive updates.
+changing record ids can be very expensive, as indexes map to the RecordId. A single document with a
+large array may have thousands of index entries, resulting in very expensive updates.
#### Cloning and bulk operations
Currently all cloning, [initial sync][] and other operations are done in terms of operating on
individual documents, though there is a BulkBuilder class for more efficiently building indexes.
### Locking and Concurrency
-MongoDB uses multi-granular intent locking, see the [Concurrency FAQ][]. In all cases, this will
+MongoDB uses multi-granular intent locking; see the [Concurrency FAQ][]. In all cases, this will
ensure that operations to meta-data, such as creation and deletion of record stores, are serialized
with respect to other accesses. Storage engines can choose to support document-level concurrency,
in which case the storage engine is responsible for any additional synchronization necessary. For
@@ -57,7 +57,7 @@ storage engine.
### Transactions
Each operation creates an OperationContext with a new RecoveryUnit, implemented by the storage
-engine, that lives until the operation finishes. Currently, query operations that return a cursor
+engine, that lives until the operation finishes. Currently, query operations that return a cursor
to the client live as long as that client cursor, with the operation context switching between its
own recovery unit and that of the client cursor. In a few other cases an internal command may use
an extra recovery unit as well. The recovery unit must implement transaction semantics as described
@@ -77,18 +77,18 @@ otherwise the guarantee of atomic updates on a document and all its indexes woul
Storage engines must provide snapshot isolation, either through locking (as is the case for the
MMAPv1 engine), through multi-version concurrency control (MVCC) or otherwise. The first read
implicitly establishes the snapshot. Operations can always see all changes they make in the context
-of a recovery unit, but other operations cannot until a successfull commit.
+of a recovery unit, but other operations cannot until a successful commit.
#### Durability
Once a transaction is committed, it is not necessarily durable: if, and only if the server fails,
as result of power loss or otherwise, the database may recover to an earlier point in time.
However, atomicity of transactions must remain preserved. Similarly, in a replica set, a primary
-that becomes unavailable may need to rollback to an earlier state when rejoining the replicas et,
+that becomes unavailable may need to roll back to an earlier state when rejoining the replica set,
if its changes were not yet seen by a majority of nodes. The RecoveryUnit implements methods to
allow operations to wait for their committed transactions to become durable.
A transaction may become visible to other transactions as soon as it commits, and a storage engine
-may use a group commit bundling a number of transactions to achieve durability. Alternatively, a
+may use a group commit, bundling a number of transactions to achieve durability. Alternatively, a
storage engine may wait for durability at commit time.
### Write Conflicts
@@ -98,11 +98,20 @@ in deadlock or violate other resource constraints. In such cases the storage eng
WriteConflictException to signal the transient failure. MongoDB will handle the exception, abort
and restart the transaction.
+### Point-in-time snapshot reads
+Two functions on the RecoveryUnit help storage engines implement point-in-time reads: setTimestamp()
+and selectSnapshot(). setTimestamp() is used by write transactions to label any forthcoming writes
+with a timestamp; these timestamps are then used to produce a point-in-time read transaction via a
+call to selectSnapshot() at the start of the read. The storage engine must produce the effect of
+reading from a snapshot that includes only writes with timestamps at or earlier than the
+selectSnapshot timestamp. This means that a point-in-time read may slice across prior write
+transactions by hiding only some data from a given write transaction, if that transaction had a
+different timestamp set prior to each write it did.
Classes to implement
--------------------
-A storage engine should generally implement the following classes. See their definition for more
+A storage engine should generally implement the following classes. See their definitions for more
details.
* [KVEngine](kv/kv_engine.h)
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index cce831b2848..3350971bddc 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -264,6 +264,11 @@ public:
}
/**
+ * See `StorageEngine::replicationBatchIsComplete()`
+ */
+ virtual void replicationBatchIsComplete() const {};
+
+ /**
* The destructor will never be called from mongod, but may be called from tests.
* Engines may assume that this will only be called in the case of clean shutdown, even if
* cleanShutdown() hasn't been called.
diff --git a/src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp b/src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp
index be9bcb80fc7..5c3bd3f869a 100644
--- a/src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp
+++ b/src/mongo/db/storage/kv/kv_engine_test_snapshots.cpp
@@ -221,7 +221,7 @@ TEST_F(SnapshotManagerTests, FailsWithNoCommittedSnapshot) {
ErrorCodes::ReadConcernMajorityNotAvailableYet);
// Now there is a committed snapshot.
- snapshotManager->setCommittedSnapshot(name);
+ snapshotManager->setCommittedSnapshot(name, Timestamp(name.asU64()));
ASSERT_OK(ru->setReadFromMajorityCommittedSnapshot());
// Not anymore!
@@ -238,7 +238,7 @@ TEST_F(SnapshotManagerTests, FailsAfterDropAllSnapshotsWhileYielded) {
// Start an operation using a committed snapshot.
auto name = prepareAndCreateSnapshot();
- snapshotManager->setCommittedSnapshot(name);
+ snapshotManager->setCommittedSnapshot(name, Timestamp(name.asU64()));
ASSERT_OK(op->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
ASSERT_EQ(itCountOn(op), 0); // acquires a snapshot.
@@ -289,17 +289,17 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) {
auto snap4 = prepareAndCreateSnapshot();
// If these fail, everything is busted.
- snapshotManager->setCommittedSnapshot(snap0);
+ snapshotManager->setCommittedSnapshot(snap0, Timestamp(snap0.asU64()));
ASSERT_EQ(itCountCommitted(), 0);
- snapshotManager->setCommittedSnapshot(snap1);
+ snapshotManager->setCommittedSnapshot(snap1, Timestamp(snap1.asU64()));
ASSERT_EQ(itCountCommitted(), 1);
// If this fails, the snapshot is from the 'create' time rather than the 'prepare' time.
- snapshotManager->setCommittedSnapshot(snap2);
+ snapshotManager->setCommittedSnapshot(snap2, Timestamp(snap2.asU64()));
ASSERT_EQ(itCountCommitted(), 2);
// If this fails, the snapshot contains writes that weren't yet committed.
- snapshotManager->setCommittedSnapshot(snap3);
+ snapshotManager->setCommittedSnapshot(snap3, Timestamp(snap3.asU64()));
ASSERT_EQ(itCountCommitted(), 3);
// This op should keep its original snapshot until abandoned.
@@ -308,7 +308,7 @@ TEST_F(SnapshotManagerTests, BasicFunctionality) {
ASSERT_EQ(itCountOn(longOp), 3);
// If this fails, the snapshot contains writes that were rolled back.
- snapshotManager->setCommittedSnapshot(snap4);
+ snapshotManager->setCommittedSnapshot(snap4, Timestamp(snap4.asU64()));
ASSERT_EQ(itCountCommitted(), 4);
// If this fails, longOp changed snapshots at an illegal time.
@@ -339,19 +339,19 @@ TEST_F(SnapshotManagerTests, UpdateAndDelete) {
deleteRecordAndCommit(id);
auto snapAfterDelete = prepareAndCreateSnapshot();
- snapshotManager->setCommittedSnapshot(snapBeforeInsert);
+ snapshotManager->setCommittedSnapshot(snapBeforeInsert, Timestamp(snapBeforeInsert.asU64()));
ASSERT_EQ(itCountCommitted(), 0);
ASSERT(!readRecordCommitted(id));
- snapshotManager->setCommittedSnapshot(snapDog);
+ snapshotManager->setCommittedSnapshot(snapDog, Timestamp(snapDog.asU64()));
ASSERT_EQ(itCountCommitted(), 1);
ASSERT_EQ(readStringCommitted(id), "Dog");
- snapshotManager->setCommittedSnapshot(snapCat);
+ snapshotManager->setCommittedSnapshot(snapCat, Timestamp(snapCat.asU64()));
ASSERT_EQ(itCountCommitted(), 1);
ASSERT_EQ(readStringCommitted(id), "Cat");
- snapshotManager->setCommittedSnapshot(snapAfterDelete);
+ snapshotManager->setCommittedSnapshot(snapAfterDelete, Timestamp(snapAfterDelete.asU64()));
ASSERT_EQ(itCountCommitted(), 0);
ASSERT(!readRecordCommitted(id));
}
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index 68156246d26..55d03fd0b52 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -376,4 +376,7 @@ bool KVStorageEngine::supportsRecoverToStableTimestamp() const {
return _engine->supportsRecoverToStableTimestamp();
}
+void KVStorageEngine::replicationBatchIsComplete() const {
+ return _engine->replicationBatchIsComplete();
+}
} // namespace mongo
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h
index 64ec316fb22..580e1c7d069 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.h
+++ b/src/mongo/db/storage/kv/kv_storage_engine.h
@@ -118,6 +118,8 @@ public:
virtual bool supportsRecoverToStableTimestamp() const override;
+ virtual void replicationBatchIsComplete() const override;
+
SnapshotManager* getSnapshotManager() const final;
void setJournalListener(JournalListener* jl) final;
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index c10407a7d6b..ca5032c941e 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -371,11 +371,38 @@ public:
virtual void deleteRecord(OperationContext* opCtx, const RecordId& dl) = 0;
+ virtual StatusWith<RecordId> insertRecordT(OperationContext* opCtx,
+ const char* data,
+ int len,
+ Timestamp timestamp,
+ bool enforceQuota) {
+ return insertRecord(opCtx, data, len, enforceQuota);
+ }
+
virtual StatusWith<RecordId> insertRecord(OperationContext* opCtx,
const char* data,
int len,
bool enforceQuota) = 0;
+ virtual Status insertRecordsT(OperationContext* opCtx,
+ std::vector<Record>* records,
+ std::vector<Timestamp>* timestamps,
+ bool enforceQuota) {
+ int index = 0;
+ for (auto& record : *records) {
+ StatusWith<RecordId> res = insertRecordT(opCtx,
+ record.data.data(),
+ record.data.size(),
+ (*timestamps)[index++],
+ enforceQuota);
+ if (!res.isOK())
+ return res.getStatus();
+
+ record.id = res.getValue();
+ }
+ return Status::OK();
+ }
+
virtual Status insertRecords(OperationContext* opCtx,
std::vector<Record>* records,
bool enforceQuota) {
@@ -399,6 +426,14 @@ public:
* On success, if idsOut is non-null the RecordIds of the inserted records will be written into
* it. It must have space for nDocs RecordIds.
*/
+ virtual Status insertRecordsWithDocWriterT(OperationContext* opCtx,
+ const DocWriter* const* docs,
+ Timestamp* timestamps,
+ size_t nDocs,
+ RecordId* idsOut = nullptr) {
+ return insertRecordsWithDocWriter(opCtx, docs, nDocs, idsOut);
+ };
+
virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
const DocWriter* const* docs,
size_t nDocs,
@@ -407,6 +442,16 @@ public:
/**
* A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters.
*/
+ StatusWith<RecordId> insertRecordWithDocWriterT(OperationContext* opCtx,
+ const DocWriter* doc,
+ Timestamp timestamp) {
+ RecordId out;
+ Status status = insertRecordsWithDocWriterT(opCtx, &doc, &timestamp, 1, &out);
+ if (!status.isOK())
+ return status;
+ return out;
+ }
+
StatusWith<RecordId> insertRecordWithDocWriter(OperationContext* opCtx, const DocWriter* doc) {
RecordId out;
Status status = insertRecordsWithDocWriter(opCtx, &doc, 1, &out);
diff --git a/src/mongo/db/storage/record_store_test_capped_visibility.cpp b/src/mongo/db/storage/record_store_test_capped_visibility.cpp
index 8ced75f97be..a304614715c 100644
--- a/src/mongo/db/storage/record_store_test_capped_visibility.cpp
+++ b/src/mongo/db/storage/record_store_test_capped_visibility.cpp
@@ -68,12 +68,6 @@ TEST(RecordStore_CappedVisibility, EmptyInitialState) {
ASSERT(!rs->getCursor(longLivedOp.get(), false)->next());
RecordId lowestHiddenId = doInsert(longLivedOp, rs);
-
- // Collection still looks empty to forward iteration but not reverse or seekExact.
- ASSERT(!rs->getCursor(longLivedOp.get(), true)->next());
- ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId);
- ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId);
-
RecordId otherId;
{
auto opCtx = harness->newOperationContext();
@@ -87,25 +81,21 @@ TEST(RecordStore_CappedVisibility, EmptyInitialState) {
otherId = doInsert(opCtx, rs);
- ASSERT(!rs->getCursor(opCtx.get(), true)->next());
+ // Can read own writes.
+ ASSERT_ID_EQ(rs->getCursor(opCtx.get(), true)->next(), otherId);
ASSERT_ID_EQ(rs->getCursor(opCtx.get(), false)->next(), otherId);
ASSERT_ID_EQ(rs->getCursor(opCtx.get())->seekExact(otherId), otherId);
wuow.commit();
-
- ASSERT(!rs->getCursor(opCtx.get(), true)->next());
- ASSERT_ID_EQ(rs->getCursor(opCtx.get(), false)->next(), otherId);
- ASSERT_ID_EQ(rs->getCursor(opCtx.get())->seekExact(otherId), otherId);
- ASSERT(!rs->getCursor(opCtx.get())->seekExact(lowestHiddenId));
}
// longLivedOp is still on old snapshot so it can't see otherId yet.
- ASSERT(!rs->getCursor(longLivedOp.get(), true)->next());
+ ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), true)->next(), lowestHiddenId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId);
ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId));
- // This makes all documents visible and lets longLivedOp get a new snapshot.
+ // Make all documents visible and let longLivedOp get a new snapshot.
longLivedWuow.commit();
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), true)->next(), lowestHiddenId);
diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp
index a257d17037a..d7bb992badd 100644
--- a/src/mongo/db/storage/record_store_test_harness.cpp
+++ b/src/mongo/db/storage/record_store_test_harness.cpp
@@ -131,7 +131,8 @@ TEST(RecordStoreTestHarness, Simple1InsertDocWroter) {
{
WriteUnitOfWork uow(opCtx.get());
DummyDocWriter dw;
- StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &dw);
+ StatusWith<RecordId> res =
+ rs->insertRecordWithDocWriterT(opCtx.get(), &dw, Timestamp(1));
ASSERT_OK(res.getStatus());
loc1 = res.getValue();
uow.commit();
diff --git a/src/mongo/db/storage/record_store_test_insertrecord.cpp b/src/mongo/db/storage/record_store_test_insertrecord.cpp
index 5446eefbd18..93f759c29b8 100644
--- a/src/mongo/db/storage/record_store_test_insertrecord.cpp
+++ b/src/mongo/db/storage/record_store_test_insertrecord.cpp
@@ -127,7 +127,8 @@ TEST(RecordStoreTestHarness, InsertRecordUsingDocWriter) {
StringDocWriter docWriter("my record", false);
WriteUnitOfWork uow(opCtx.get());
- StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &docWriter);
+ StatusWith<RecordId> res =
+ rs->insertRecordWithDocWriterT(opCtx.get(), &docWriter, Timestamp(1));
ASSERT_OK(res.getStatus());
loc = res.getValue();
uow.commit();
@@ -161,7 +162,8 @@ TEST(RecordStoreTestHarness, InsertMultipleRecordsUsingDocWriter) {
StringDocWriter docWriter(ss.str(), false);
WriteUnitOfWork uow(opCtx.get());
- StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &docWriter);
+ StatusWith<RecordId> res =
+ rs->insertRecordWithDocWriterT(opCtx.get(), &docWriter, Timestamp(1));
ASSERT_OK(res.getStatus());
locs[i] = res.getValue();
uow.commit();
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 3baa174dcbc..fe9ed473163 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -133,6 +133,26 @@ public:
virtual SnapshotId getSnapshotId() const = 0;
/**
+ * Sets a timestamp to assign to future writes in a transaction.
+ * All subsequent writes will be assigned this timestamp.
+ * If setTimestamp() is called again, specifying a new timestamp, future writes will use this
+ * new timestamp but past writes remain with their originally assigned timestamps.
+ * Writes that occur before any setTimestamp() is called will be assigned the timestamp
+ * specified in the last setTimestamp() call in the transaction, at commit time.
+ */
+ virtual Status setTimestamp(SnapshotName timestamp) {
+ return Status::OK();
+ }
+
+ /**
+ * Chooses which snapshot to use for read transactions.
+ */
+ virtual Status selectSnapshot(SnapshotName timestamp) {
+ return Status(ErrorCodes::CommandNotSupported,
+ "point-in-time reads are not implemented for this storage engine");
+ }
+
+ /**
* A Change is an action that is registerChange()'d while a WriteUnitOfWork exists. The
* change is either rollback()'d or commit()'d when the WriteUnitOfWork goes out of scope.
*
diff --git a/src/mongo/db/storage/snapshot_manager.h b/src/mongo/db/storage/snapshot_manager.h
index ef588d1c1a2..b0dfc15c013 100644
--- a/src/mongo/db/storage/snapshot_manager.h
+++ b/src/mongo/db/storage/snapshot_manager.h
@@ -80,7 +80,7 @@ public:
* can be done later. In particular, cleaning up of old snapshots should be deferred until
* cleanupUnneededSnapshots is called.
*/
- virtual void setCommittedSnapshot(const SnapshotName& name) = 0;
+ virtual void setCommittedSnapshot(const SnapshotName& name, Timestamp ts) = 0;
/**
* Cleans up all snapshots older than the current committed snapshot.
diff --git a/src/mongo/db/storage/snapshot_name.h b/src/mongo/db/storage/snapshot_name.h
index eddc06b48bd..e7a2676bc92 100644
--- a/src/mongo/db/storage/snapshot_name.h
+++ b/src/mongo/db/storage/snapshot_name.h
@@ -39,7 +39,8 @@ namespace mongo {
class SnapshotName {
public:
explicit SnapshotName(uint64_t value) : _value(value) {}
- explicit SnapshotName(Timestamp ts) : SnapshotName(ts.asULL()) {}
+ explicit SnapshotName(Timestamp timestamp) : _value(timestamp.asULL()) {}
+ SnapshotName() : _value(0) {}
/**
* Returns a SnapshotName guaranteed to compare < all names of actual snapshots.
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 5e20290e161..7af4e45b942 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -328,6 +328,15 @@ public:
*/
virtual void setInitialDataTimestamp(SnapshotName snapshotName) {}
+ /**
+ * Notifies the storage engine that a replication batch has completed.
+ * This means that all the writes associated with the oplog entries in the batch are
+ * finished and no new writes with timestamps associated with those oplog entries will show
+ * up in the future.
+ * This function can be used to ensure oplog visibility rules are not broken, for example.
+ */
+ virtual void replicationBatchIsComplete() const {};
+
// (CollectionName, IndexName)
typedef std::pair<std::string, std::string> CollectionIndexNamePair;
diff --git a/src/mongo/db/storage/wiredtiger/SConscript b/src/mongo/db/storage/wiredtiger/SConscript
index 2a6755c7f49..20ca6219747 100644
--- a/src/mongo/db/storage/wiredtiger/SConscript
+++ b/src/mongo/db/storage/wiredtiger/SConscript
@@ -37,6 +37,7 @@ if wiredtiger:
'wiredtiger_global_options.cpp',
'wiredtiger_index.cpp',
'wiredtiger_kv_engine.cpp',
+ 'wiredtiger_oplog_manager.cpp',
'wiredtiger_record_store.cpp',
'wiredtiger_recovery_unit.cpp',
'wiredtiger_session_cache.cpp',
@@ -113,11 +114,13 @@ if wiredtiger:
wtEnv.Library(
target='additional_wiredtiger_record_store_tests',
source=[
+ 'record_store_test_oplog.cpp',
'wiredtiger_record_store_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/storage/kv/kv_engine_core',
'$BUILD_DIR/mongo/db/storage/record_store_test_harness',
+ '$BUILD_DIR/mongo/util/clock_source_mock',
'storage_wiredtiger_mock',
],
)
diff --git a/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp b/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp
new file mode 100644
index 00000000000..55670cf30b5
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp
@@ -0,0 +1,319 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/record_store_test_harness.h"
+
+#include "mongo/db/storage/record_store.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+StatusWith<RecordId> insertBSON(ServiceContext::UniqueOperationContext& opCtx,
+ std::unique_ptr<RecordStore>& rs,
+ const Timestamp& opTime) {
+ BSONObj obj = BSON("ts" << opTime);
+ WriteUnitOfWork wuow(opCtx.get());
+ Status status = rs->oplogDiskLocRegister(opCtx.get(), opTime);
+ if (!status.isOK())
+ return StatusWith<RecordId>(status);
+ StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false);
+ if (res.isOK())
+ wuow.commit();
+ return res;
+}
+
+RecordId _oplogOrderInsertOplog(OperationContext* opCtx,
+ const std::unique_ptr<RecordStore>& rs,
+ int inc) {
+ Timestamp opTime = Timestamp(5, inc);
+ Status status = rs->oplogDiskLocRegister(opCtx, opTime);
+ ASSERT_OK(status);
+ BSONObj obj = BSON("ts" << opTime);
+ StatusWith<RecordId> res = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), false);
+ ASSERT_OK(res.getStatus());
+ return res.getValue();
+}
+
+TEST(RecordStore_Oplog, OplogHack) {
+ std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper();
+ // Use a large enough cappedMaxSize so that the limit is not reached by doing the inserts within
+ // the test itself.
+ const int64_t cappedMaxSize = 10 * 1024; // 10KB
+ std::unique_ptr<RecordStore> rs(
+ harnessHelper->newCappedRecordStore("local.oplog.foo", cappedMaxSize, -1));
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+
+ // always illegal
+ ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(2, -1)).getStatus(), ErrorCodes::BadValue);
+
+ {
+ WriteUnitOfWork wuow(opCtx.get());
+ BSONObj obj = BSON("not_ts" << Timestamp(2, 1));
+ ASSERT_EQ(
+ rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false).getStatus(),
+ ErrorCodes::BadValue);
+ }
+ {
+ WriteUnitOfWork wuow(opCtx.get());
+ BSONObj obj = BSON("ts"
+ << "not a Timestamp");
+ ASSERT_EQ(
+ rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false).getStatus(),
+ ErrorCodes::BadValue);
+ }
+
+ ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(-2, 1)).getStatus(), ErrorCodes::BadValue);
+
+ // success cases
+ ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(1, 1)).getValue(), RecordId(1, 1));
+ ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(1, 2)).getValue(), RecordId(1, 2));
+ ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(2, 2)).getValue(), RecordId(2, 2));
+ }
+
+ // Make sure all are visible.
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ // find start
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), RecordId()); // nothing <=
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 1)), RecordId(1, 2)); // between
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 2)), RecordId(2, 2)); // ==
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2)); // > highest
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ rs->cappedTruncateAfter(opCtx.get(), RecordId(2, 2), false); // no-op
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ rs->cappedTruncateAfter(opCtx.get(), RecordId(1, 2), false); // deletes 2,2
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 2));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ rs->cappedTruncateAfter(opCtx.get(), RecordId(1, 2), true); // deletes 1,2
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 1));
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork wuow(opCtx.get());
+ ASSERT_OK(rs->truncate(opCtx.get())); // deletes 1,1 and leaves collection empty
+ wuow.commit();
+ }
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId());
+ }
+}
+
+TEST(RecordStore_Oplog, OplogHackOnNonOplog) {
+ std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper();
+ std::unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore("local.NOT_oplog.foo"));
+
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+
+ BSONObj obj = BSON("ts" << Timestamp(2, -1));
+ {
+ WriteUnitOfWork wuow(opCtx.get());
+ ASSERT_OK(rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false).getStatus());
+ wuow.commit();
+ }
+ ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), boost::none);
+}
+
+
+TEST(RecordStore_Oplog, OplogOrder) {
+ std::unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
+ std::unique_ptr<RecordStore> rs(
+ harnessHelper->newCappedRecordStore("local.oplog.rs", 100000, -1));
+
+ RecordId id1;
+
+ { // first insert a document
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ {
+ WriteUnitOfWork uow(opCtx.get());
+ id1 = _oplogOrderInsertOplog(opCtx.get(), rs, 1);
+ uow.commit();
+ }
+ }
+
+ // Make sure it is visible.
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
+ {
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT(record);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ {
+ // now we insert 2 docs, but commit the 2nd one first.
+ // we make sure we can't find the 2nd until the first is committed.
+ ServiceContext::UniqueOperationContext earlyReader(harnessHelper->newOperationContext());
+
+ auto earlyCursor = rs->getCursor(earlyReader.get());
+ ASSERT_EQ(earlyCursor->seekExact(id1)->id, id1);
+ earlyCursor->save();
+ earlyReader->recoveryUnit()->abandonSnapshot();
+
+ auto client1 = harnessHelper->serviceContext()->makeClient("c1");
+ auto t1 = harnessHelper->newOperationContext(client1.get());
+ WriteUnitOfWork w1(t1.get());
+ _oplogOrderInsertOplog(t1.get(), rs, 20);
+ // do not commit yet
+
+ { // create 2nd doc
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto t2 = harnessHelper->newOperationContext(client2.get());
+ {
+ WriteUnitOfWork w2(t2.get());
+ _oplogOrderInsertOplog(t2.get(), rs, 30);
+ w2.commit();
+ }
+ }
+
+ { // Other operations should not be able to see 2nd doc until w1 commits.
+ earlyCursor->restore();
+ ASSERT(!earlyCursor->next());
+
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ w1.commit();
+ }
+
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
+ { // now all 3 docs should be visible
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(cursor->next());
+ ASSERT(cursor->next());
+ ASSERT(!cursor->next());
+ }
+
+ // Rollback the last two oplog entries, then insert entries with older optimes and ensure that
+ // the visibility rules aren't violated. See SERVER-21645
+ {
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ rs->cappedTruncateAfter(opCtx.get(), id1, /*inclusive*/ false);
+ }
+
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
+ {
+ // Now we insert 2 docs with timestamps earlier than before, but commit the 2nd one first.
+ // We make sure we can't find the 2nd until the first is committed.
+ ServiceContext::UniqueOperationContext earlyReader(harnessHelper->newOperationContext());
+ auto earlyCursor = rs->getCursor(earlyReader.get());
+ ASSERT_EQ(earlyCursor->seekExact(id1)->id, id1);
+ earlyCursor->save();
+ earlyReader->recoveryUnit()->abandonSnapshot();
+
+ auto client1 = harnessHelper->serviceContext()->makeClient("c1");
+ auto t1 = harnessHelper->newOperationContext(client1.get());
+ WriteUnitOfWork w1(t1.get());
+ _oplogOrderInsertOplog(t1.get(), rs, 2);
+ // do not commit yet
+
+ { // create 2nd doc
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto t2 = harnessHelper->newOperationContext(client2.get());
+ {
+ WriteUnitOfWork w2(t2.get());
+ _oplogOrderInsertOplog(t2.get(), rs, 3);
+ w2.commit();
+ }
+ }
+
+ { // Other operations should not be able to see 2nd doc until w1 commits.
+ ASSERT(earlyCursor->restore());
+ ASSERT(!earlyCursor->next());
+
+ auto client2 = harnessHelper->serviceContext()->makeClient("c2");
+ auto opCtx = harnessHelper->newOperationContext(client2.get());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(!cursor->next());
+ }
+
+ w1.commit();
+ }
+
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
+ { // now all 3 docs should be visible
+ ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
+ auto cursor = rs->getCursor(opCtx.get());
+ auto record = cursor->seekExact(id1);
+ ASSERT_EQ(id1, record->id);
+ ASSERT(cursor->next());
+ ASSERT(cursor->next());
+ ASSERT(!cursor->next());
+ }
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 43f4c7b545f..9752fa11c0e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -201,6 +201,9 @@ public:
_sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint);
}
}
+ } catch (const WriteConflictException& wce) {
+ // Temporary: remove this after WT-3483
+ warning() << "Checkpoint encountered a write conflict exception.";
} catch (const AssertionException& exc) {
invariant(exc.code() == ErrorCodes::ShutdownInProgress);
}
@@ -726,9 +729,9 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getGroupedRecordStore(
std::unique_ptr<WiredTigerRecordStore> ret;
if (prefix == KVPrefix::kNotPrefixed) {
- ret = stdx::make_unique<StandardWiredTigerRecordStore>(opCtx, params);
+ ret = stdx::make_unique<StandardWiredTigerRecordStore>(this, opCtx, params);
} else {
- ret = stdx::make_unique<PrefixedWiredTigerRecordStore>(opCtx, params, prefix);
+ ret = stdx::make_unique<PrefixedWiredTigerRecordStore>(this, opCtx, params, prefix);
}
ret->postConstructorInit(opCtx);
@@ -1030,4 +1033,29 @@ bool WiredTigerKVEngine::supportsRecoverToStableTimestamp() const {
return _checkpointThread->supportsRecoverToStableTimestamp();
}
+
+void WiredTigerKVEngine::initializeOplogManager(OperationContext* opCtx,
+ const std::string& uri,
+ WiredTigerRecordStore* oplogRecordStore) {
+ stdx::unique_lock<stdx::mutex> lock(_oplogManagerMutex);
+ if (_oplogManagerCount == 0)
+ _oplogManager.reset(new WiredTigerOplogManager(opCtx, uri, oplogRecordStore));
+ _oplogManagerCount++;
+}
+
+void WiredTigerKVEngine::deleteOplogManager() {
+ stdx::unique_lock<stdx::mutex> lock(_oplogManagerMutex);
+ invariant(_oplogManagerCount > 0);
+ _oplogManagerCount--;
+ if (_oplogManagerCount == 0)
+ _oplogManager.reset();
+}
+
+void WiredTigerKVEngine::replicationBatchIsComplete() const {
+ stdx::unique_lock<stdx::mutex> lock(_oplogManagerMutex);
+ if (_oplogManager) {
+ _oplogManager->triggerJournalFlush();
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 383935c937d..b9758b814e1 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -39,6 +39,7 @@
#include "mongo/bson/ordering.h"
#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -48,6 +49,7 @@ namespace mongo {
class ClockSource;
class JournalListener;
+class WiredTigerRecordStore;
class WiredTigerSessionCache;
class WiredTigerSizeStorer;
@@ -181,6 +183,30 @@ public:
void syncSizeInfo(bool sync) const;
+ /*
+ * Initializes and reference counts an oplog manager, to control oplog entry visibility for
+ * reads.
+ * The oplogManager object is held by this class but is constructed and deleted as per
+ * the Oplog record store (the record store corresponding to the oplog collection). If
+ * multiple oplog record stores are created, the first oplog record store to be constructed will
+ * construct the Manager, and the last oplog record store to be deleted will destruct the
+ * Manager.
+ */
+ void initializeOplogManager(OperationContext* opCtx,
+ const std::string& uri,
+ WiredTigerRecordStore* oplogRecordStore);
+ void deleteOplogManager();
+
+ WiredTigerOplogManager* getOplogManager() const {
+ return _oplogManager.get();
+ }
+
+ /*
+ * This function is called when replication has completed a batch. In this function, we
+ * refresh our oplog visiblity read-at-timestamp value.
+ */
+ void replicationBatchIsComplete() const override;
+
/**
* Sets the implementation for `initRsOplogBackgroundThread` (allowing tests to skip the
* background job, for example). Intended to be called from a MONGO_INITIALIZER and therefroe in
@@ -213,6 +239,15 @@ private:
WT_CONNECTION* _conn;
WT_EVENT_HANDLER _eventHandler;
std::unique_ptr<WiredTigerSessionCache> _sessionCache;
+
+ // Mutex used to protect use of _oplogManager and _oplogManagerCount by this instance of KV
+ // engine.
+ // Other uses by the record store are managed by itself and do not need to lock, other
+ // than at shutdown (delete).
+ mutable stdx::mutex _oplogManagerMutex;
+ std::unique_ptr<WiredTigerOplogManager> _oplogManager;
+ std::size_t _oplogManagerCount = 0;
+
std::string _canonicalName;
std::string _path;
std::string _wtOpenConfig;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
new file mode 100644
index 00000000000..286b35fca7a
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
@@ -0,0 +1,221 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include <cstring>
+
+#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+// This is the minimum valid timestamp; it can be used for reads that need to see all untimestamped
+// data but no timestamped data. We cannot use 0 here because 0 means see all timestamped data.
+const char minimumTimestampStr[] = "1";
+} // namespace
+
+MONGO_FP_DECLARE(WTPausePrimaryOplogDurabilityLoop);
+
+WiredTigerOplogManager::WiredTigerOplogManager(OperationContext* opCtx,
+ const std::string& uri,
+ WiredTigerRecordStore* oplogRecordStore) {
+ // Prime the oplog read timestamp.
+ auto sessionCache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache();
+ char allCommittedTimestampBuf[TIMESTAMP_BUF_SIZE];
+ _fetchAllCommittedValue(sessionCache->conn(), allCommittedTimestampBuf);
+ _setOplogReadTimestamp(allCommittedTimestampBuf);
+
+ std::unique_ptr<SeekableRecordCursor> reverseOplogCursor =
+ oplogRecordStore->getCursor(opCtx, false /* false = reverse cursor */);
+ auto lastRecord = reverseOplogCursor->next();
+ _oplogMaxAtStartup = lastRecord ? lastRecord->id : RecordId();
+
+ _oplogJournalThread = stdx::thread(
+ &WiredTigerOplogManager::_oplogJournalThreadLoop, this, sessionCache, oplogRecordStore);
+}
+
+WiredTigerOplogManager::~WiredTigerOplogManager() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_oplogVisibilityStateMutex);
+ _shuttingDown = true;
+ }
+
+ if (_oplogJournalThread.joinable()) {
+ _opsWaitingForJournalCV.notify_one();
+ _oplogJournalThread.join();
+ }
+}
+
+void WiredTigerOplogManager::waitForAllEarlierOplogWritesToBeVisible(
+ const WiredTigerRecordStore* oplogRecordStore, OperationContext* opCtx) const {
+ invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork());
+
+ // In order to reliably detect rollback situations, we need to fetch the latestVisibleTimestamp
+ // prior to querying the end of the oplog.
+ auto currentLatestVisibleTimestamp = getOplogReadTimestamp();
+
+ // Procedure: issue a read on a reverse cursor (which is not subject to the oplog visibility
+ // rules), see what is last, and wait for that to become visible.
+ std::unique_ptr<SeekableRecordCursor> cursor =
+ oplogRecordStore->getCursor(opCtx, false /* false = reverse cursor */);
+ auto lastRecord = cursor->next();
+ if (!lastRecord) {
+ LOG(2) << "Trying to query an empty oplog";
+ opCtx->recoveryUnit()->abandonSnapshot();
+ return;
+ }
+ const auto waitingFor = lastRecord->id;
+ // Close transaction before we wait.
+ opCtx->recoveryUnit()->abandonSnapshot();
+
+ stdx::unique_lock<stdx::mutex> lk(_oplogVisibilityStateMutex);
+ opCtx->waitForConditionOrInterrupt(_opsBecameVisibleCV, lk, [&] {
+ auto newLatestVisibleTimestamp = getOplogReadTimestamp();
+ if (newLatestVisibleTimestamp < currentLatestVisibleTimestamp) {
+ LOG(1) << "oplog latest visible timestamp went backwards";
+ // If the visibility went backwards, this means a rollback occurred.
+ // Thus, we are finished waiting.
+ return true;
+ }
+ currentLatestVisibleTimestamp = newLatestVisibleTimestamp;
+
+ // currentLatestVisibleTimestamp might be Timestamp "1" if there are no oplog documents
+ // inserted since the last mongod restart. In this case, we need to simulate what timestamp
+ // the last oplog document had when it was written, which is the _oplogMaxAtStartup value.
+ RecordId latestVisible =
+ std::max(RecordId(currentLatestVisibleTimestamp), _oplogMaxAtStartup);
+ if (latestVisible < waitingFor) {
+ LOG(2) << "Operation is waiting for " << waitingFor << "; latestVisible is "
+ << currentLatestVisibleTimestamp << " oplogMaxAtStartup is "
+ << _oplogMaxAtStartup;
+ }
+ return latestVisible >= waitingFor;
+ });
+}
+
+void WiredTigerOplogManager::triggerJournalFlush() {
+ stdx::lock_guard<stdx::mutex> lk(_oplogVisibilityStateMutex);
+ if (!_opsWaitingForJournal) {
+ _opsWaitingForJournal = true;
+ _opsWaitingForJournalCV.notify_one();
+ }
+}
+
+void WiredTigerOplogManager::_oplogJournalThreadLoop(
+ WiredTigerSessionCache* sessionCache, WiredTigerRecordStore* oplogRecordStore) noexcept {
+ Client::initThread("WTOplogJournalThread");
+
+ // This thread updates the oplog read timestamp, the timestamp used to read from the oplog with
+ // forward cursors. The timestamp is used to hide oplog entries that might be committed but
+ // have uncommitted entries ahead of them.
+ while (true) {
+ stdx::unique_lock<stdx::mutex> lk(_oplogVisibilityStateMutex);
+ {
+ MONGO_IDLE_THREAD_BLOCK;
+ _opsWaitingForJournalCV.wait(lk,
+ [&] { return _shuttingDown || _opsWaitingForJournal; });
+ }
+
+ while (!_shuttingDown && MONGO_FAIL_POINT(WTPausePrimaryOplogDurabilityLoop)) {
+ lk.unlock();
+ sleepmillis(10);
+ lk.lock();
+ }
+
+ if (_shuttingDown) {
+ log() << "oplog journal thread loop shutting down";
+ return;
+ }
+ _opsWaitingForJournal = false;
+ lk.unlock();
+
+ char allCommittedTimestampBuf[TIMESTAMP_BUF_SIZE];
+ _fetchAllCommittedValue(sessionCache->conn(), allCommittedTimestampBuf);
+
+ std::uint64_t newTimestamp;
+ auto status = parseNumberFromStringWithBase(allCommittedTimestampBuf, 16, &newTimestamp);
+ fassertStatusOK(38002, status);
+
+ if (newTimestamp == _oplogReadTimestamp.load()) {
+ LOG(2) << "no new oplog entries were made visible: " << newTimestamp;
+ continue;
+ }
+
+ // In order to avoid oplog holes after an unclean shutdown, we must ensure this proposed
+ // oplog read timestamp's documents are durable before publishing that timestamp.
+ sessionCache->waitUntilDurable(/*forceCheckpoint=*/false, false);
+
+ lk.lock();
+
+ // Publish the new timestamp value.
+ _setOplogReadTimestamp(allCommittedTimestampBuf);
+ _opsBecameVisibleCV.notify_all();
+ lk.unlock();
+
+ // Wake up any await_data cursors and tell them more data might be visible now.
+ oplogRecordStore->notifyCappedWaitersIfNeeded();
+ }
+}
+
+std::uint64_t WiredTigerOplogManager::getOplogReadTimestamp() const {
+ return _oplogReadTimestamp.load();
+}
+
+void WiredTigerOplogManager::setOplogReadTimestamp(Timestamp ts) {
+ _oplogReadTimestamp.store(ts.asULL());
+}
+
+void WiredTigerOplogManager::_setOplogReadTimestamp(char buf[TIMESTAMP_BUF_SIZE]) {
+ std::uint64_t newTimestamp;
+ auto status = parseNumberFromStringWithBase(buf, 16, &newTimestamp);
+ fassertStatusOK(38001, status);
+ _oplogReadTimestamp.store(newTimestamp);
+ LOG(2) << "setting new oplogReadTimestamp: " << newTimestamp;
+}
+
+void WiredTigerOplogManager::_fetchAllCommittedValue(WT_CONNECTION* conn,
+ char buf[TIMESTAMP_BUF_SIZE]) {
+ // Fetch the latest all_committed value from the storage engine. This value will be a
+ // timestamp that has no holes (uncommitted transactions with lower timestamps) behind it.
+ auto wtstatus = conn->query_timestamp(conn, buf, "get=all_committed");
+ if (wtstatus == WT_NOTFOUND) {
+ // Treat this as lowest possible timestamp; we need to see all preexisting data but no new
+ // (timestamped) data.
+ std::strncpy(buf, minimumTimestampStr, TIMESTAMP_BUF_SIZE);
+ } else {
+ invariantWTOK(wtstatus);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
new file mode 100644
index 00000000000..ebbdf8a0108
--- /dev/null
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+class WiredTigerRecordStore;
+class WiredTigerSessionCache;
+
+
+// Manages oplog visibility, by periodically querying WiredTiger's all_committed timestamp value and
+// then using that timestamp for all transactions that read the oplog collection.
+class WiredTigerOplogManager {
+ MONGO_DISALLOW_COPYING(WiredTigerOplogManager);
+
+public:
+ WiredTigerOplogManager(OperationContext* opCtx,
+ const std::string& uri,
+ WiredTigerRecordStore* oplogRecordStore);
+ ~WiredTigerOplogManager();
+
+ // The oplogReadTimestamp is the timestamp used for oplog reads, to prevent readers from
+ // reading past uncommitted transactions (which may create "holes" in the oplog after an
+ // unclean shutdown).
+ std::uint64_t getOplogReadTimestamp() const;
+ void setOplogReadTimestamp(Timestamp ts);
+
+ // Triggers the oplogJournal thread to update its oplog read timestamp, by flushing the journal.
+ void triggerJournalFlush();
+
+ // Waits until all committed writes at this point to become visible (that is, no holes exist in
+ // the oplog.)
+ void waitForAllEarlierOplogWritesToBeVisible(const WiredTigerRecordStore* oplogRecordStore,
+ OperationContext* opCtx) const;
+
+private:
+ // TIMESTAMP_SIZE is configured to be 8 bytes, and so its hexadecimal string representation
+ // is 2 characters per byte (00 through FF), plus one more for the null terminator.
+ static const int TIMESTAMP_BUF_SIZE = 2 * /*TIMESTAMP_SIZE*/ 8 + 1;
+
+ void _oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache,
+ WiredTigerRecordStore* oplogRecordStore) noexcept;
+
+ void _setOplogReadTimestamp(char buf[TIMESTAMP_BUF_SIZE]);
+
+ void _fetchAllCommittedValue(WT_CONNECTION* conn, char buf[TIMESTAMP_BUF_SIZE]);
+
+ stdx::thread _oplogJournalThread;
+ mutable stdx::mutex _oplogVisibilityStateMutex;
+ mutable stdx::condition_variable
+ _opsWaitingForJournalCV; // Signaled to trigger a journal flush.
+ mutable stdx::condition_variable
+ _opsBecameVisibleCV; // Signaled when a journal flush is complete.
+
+ bool _shuttingDown = false; // Guarded by oplogVisibilityStateMutex.
+
+ // This is the RecordId of the newest oplog document in the oplog on startup. It is used as a
+ // floor in waitForAllEarlierOplogWritesToBeVisible().
+ RecordId _oplogMaxAtStartup = RecordId(0); // Guarded by oplogVisibilityStateMutex.
+ bool _opsWaitingForJournal = false; // Guarded by oplogVisibilityStateMutex.
+
+ AtomicUInt64 _oplogReadTimestamp;
+};
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp
index fb2e795b2f9..94606dc57cc 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp
@@ -40,8 +40,11 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/json.h"
#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/storage/kv/kv_engine_test_harness.h"
#include "mongo/db/storage/kv/kv_prefix.h"
#include "mongo/db/storage/record_store_test_harness.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
@@ -51,6 +54,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
@@ -63,42 +67,27 @@ using std::stringstream;
class PrefixedWiredTigerHarnessHelper final : public RecordStoreHarnessHelper {
public:
- static WT_CONNECTION* createConnection(StringData dbpath, StringData extraStrings) {
- WT_CONNECTION* conn = NULL;
-
- std::stringstream ss;
- ss << "create,";
- ss << "statistics=(all),";
- ss << extraStrings;
- string config = ss.str();
- int ret = wiredtiger_open(dbpath.toString().c_str(), NULL, config.c_str(), &conn);
- ASSERT_OK(wtRCToStatus(ret));
- ASSERT(conn);
-
- return conn;
- }
-
PrefixedWiredTigerHarnessHelper()
: _dbpath("wt_test"),
- _conn(createConnection(_dbpath.path(), "")),
- _sessionCache(new WiredTigerSessionCache(_conn)) {}
-
- PrefixedWiredTigerHarnessHelper(StringData extraStrings)
- : _dbpath("wt_test"),
- _conn(createConnection(_dbpath.path(), extraStrings)),
- _sessionCache(new WiredTigerSessionCache(_conn)) {}
-
- ~PrefixedWiredTigerHarnessHelper() {
- delete _sessionCache;
- _conn->close(_conn, NULL);
- }
+ _engine(new WiredTigerKVEngine(kWiredTigerEngineName,
+ _dbpath.path(),
+ _cs.get(),
+ "",
+ 1,
+ false,
+ false,
+ false,
+ false)) {}
+
+ PrefixedWiredTigerHarnessHelper(StringData extraStrings) : _dbpath("wt_test") {}
virtual std::unique_ptr<RecordStore> newNonCappedRecordStore() {
return newNonCappedRecordStore("a.b");
}
virtual std::unique_ptr<RecordStore> newNonCappedRecordStore(const std::string& ns) {
- WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit(_sessionCache);
+ WiredTigerRecoveryUnit* ru =
+ checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit());
OperationContextNoop opCtx(ru);
string uri = "table:" + ns;
@@ -127,7 +116,7 @@ public:
params.sizeStorer = nullptr;
auto ret = stdx::make_unique<PrefixedWiredTigerRecordStore>(
- &opCtx, params, KVPrefix::generateNextPrefix());
+ _engine.get(), &opCtx, params, KVPrefix::generateNextPrefix());
ret->postConstructorInit(&opCtx);
return std::move(ret);
}
@@ -140,7 +129,8 @@ public:
virtual std::unique_ptr<RecordStore> newCappedRecordStore(const std::string& ns,
int64_t cappedMaxSize,
int64_t cappedMaxDocs) {
- WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit(_sessionCache);
+ WiredTigerRecoveryUnit* ru =
+ checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit());
OperationContextNoop opCtx(ru);
string uri = "table:a.b";
@@ -171,13 +161,15 @@ public:
params.cappedCallback = nullptr;
params.sizeStorer = nullptr;
- auto ret = stdx::make_unique<PrefixedWiredTigerRecordStore>(&opCtx, params, prefix);
+ auto ret =
+ stdx::make_unique<PrefixedWiredTigerRecordStore>(_engine.get(), &opCtx, params, prefix);
ret->postConstructorInit(&opCtx);
return std::move(ret);
}
virtual std::unique_ptr<RecoveryUnit> newRecoveryUnit() final {
- return stdx::make_unique<WiredTigerRecoveryUnit>(_sessionCache);
+ return std::unique_ptr<WiredTigerRecoveryUnit>(
+ checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit()));
}
virtual bool supportsDocLocking() final {
@@ -185,13 +177,14 @@ public:
}
virtual WT_CONNECTION* conn() const {
- return _conn;
+ return _engine->getConnection();
}
private:
unittest::TempDir _dbpath;
- WT_CONNECTION* _conn;
- WiredTigerSessionCache* _sessionCache;
+ const std::unique_ptr<ClockSource> _cs = stdx::make_unique<ClockSourceMock>();
+
+ std::unique_ptr<WiredTigerKVEngine> _engine;
};
std::unique_ptr<HarnessHelper> makeHarnessHelper() {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 661472bd552..d0d19578afd 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -63,9 +63,6 @@
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
-//#define RS_ITERATOR_TRACE(x) log() << "WTRS::Iterator " << x
-#define RS_ITERATOR_TRACE(x)
-
namespace mongo {
using std::unique_ptr;
@@ -79,23 +76,36 @@ static const int kMaximumRecordStoreVersion = 1;
MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion >= kMinimumRecordStoreVersion);
MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion <= kMaximumRecordStoreVersion);
-bool shouldUseOplogHack(OperationContext* opCtx, const std::string& uri) {
+void checkOplogFormatVersion(OperationContext* opCtx, const std::string& uri) {
StatusWith<BSONObj> appMetadata = WiredTigerUtil::getApplicationMetadata(opCtx, uri);
- if (!appMetadata.isOK()) {
- return false;
- }
+ fassertStatusOK(39999, appMetadata);
- return (appMetadata.getValue().getIntField("oplogKeyExtractionVersion") == 1);
+ fassertNoTrace(39998, appMetadata.getValue().getIntField("oplogKeyExtractionVersion") == 1);
}
-
} // namespace
MONGO_FP_DECLARE(WTWriteConflictException);
MONGO_FP_DECLARE(WTWriteConflictExceptionForReads);
-MONGO_FP_DECLARE(WTPausePrimaryOplogDurabilityLoop);
const std::string kWiredTigerEngineName = "wiredTiger";
+class WiredTigerRecordStore::OplogInsertChange final : public RecoveryUnit::Change {
+public:
+ OplogInsertChange(WiredTigerOplogManager* om) : _om(om) {}
+
+ void commit() final {
+ _om->triggerJournalFlush();
+ }
+
+ void rollback() final {
+ // Trigger even on rollback since it might make later commits visible.
+ _om->triggerJournalFlush();
+ }
+
+private:
+ WiredTigerOplogManager* const _om;
+};
+
class WiredTigerRecordStore::OplogStones::InsertChange final : public RecoveryUnit::Change {
public:
InsertChange(OplogStones* oplogStones,
@@ -608,7 +618,9 @@ StatusWith<std::string> WiredTigerRecordStore::generateCreateString(
return StatusWith<std::string>(ss);
}
-WiredTigerRecordStore::WiredTigerRecordStore(OperationContext* ctx, Params params)
+WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
+ OperationContext* ctx,
+ Params params)
: RecordStore(params.ns),
_uri(params.uri),
_tableId(WiredTigerSession::genTableId()),
@@ -622,11 +634,11 @@ WiredTigerRecordStore::WiredTigerRecordStore(OperationContext* ctx, Params param
_cappedSleep(0),
_cappedSleepMS(0),
_cappedCallback(params.cappedCallback),
+ _shuttingDown(false),
_cappedDeleteCheckCount(0),
- _useOplogHack(shouldUseOplogHack(ctx, _uri)),
_sizeStorer(params.sizeStorer),
_sizeStorerCounter(0),
- _shuttingDown(false) {
+ _kvEngine(kvEngine) {
Status versionStatus = WiredTigerUtil::checkApplicationMetadataFormatVersion(
ctx, _uri, kMinimumRecordStoreVersion, kMaximumRecordStoreVersion)
.getStatus();
@@ -654,12 +666,15 @@ WiredTigerRecordStore::WiredTigerRecordStore(OperationContext* ctx, Params param
WiredTigerUtil::useTableLogging(NamespaceString(ns()),
getGlobalReplSettings().usingReplSets())));
}
+
+ if (_isOplog) {
+ checkOplogFormatVersion(ctx, _uri);
+ }
}
WiredTigerRecordStore::~WiredTigerRecordStore() {
{
stdx::lock_guard<stdx::timed_mutex> lk(_cappedDeleterMutex);
- stdx::lock_guard<stdx::mutex> lk2(_uncommittedRecordIdsMutex);
_shuttingDown = true;
}
@@ -672,9 +687,9 @@ WiredTigerRecordStore::~WiredTigerRecordStore() {
_oplogStones->kill();
}
- if (_oplogJournalThread.joinable()) {
- _opsWaitingForJournalCV.notify_one();
- _oplogJournalThread.join();
+ if (_isOplog) {
+ // Delete oplog visibility manager on KV engine.
+ _kvEngine->deleteOplogManager();
}
}
@@ -683,7 +698,6 @@ void WiredTigerRecordStore::postConstructorInit(OperationContext* opCtx) {
std::unique_ptr<SeekableRecordCursor> cursor = getCursor(opCtx, /*forward=*/false);
if (auto record = cursor->next()) {
int64_t max = record->id.repr();
- _oplog_highestSeen = record->id;
_nextIdNum.store(1 + max);
if (_sizeStorer) {
@@ -718,9 +732,8 @@ void WiredTigerRecordStore::postConstructorInit(OperationContext* opCtx) {
}
if (_isOplog) {
- _oplogJournalThread = stdx::thread(&WiredTigerRecordStore::_oplogJournalThreadLoop,
- this,
- WiredTigerRecoveryUnit::get(opCtx)->getSessionCache());
+ invariant(_kvEngine);
+ _kvEngine->initializeOplogManager(opCtx, _uri, this);
}
}
@@ -946,9 +959,6 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* opC
if (newestIdToDelete >= justInserted) // TODO: use oldest uncommitted instead
break;
- if (_shuttingDown)
- break;
-
WT_ITEM old_value;
invariantWTOK(truncateEnd->get_value(truncateEnd, &old_value));
@@ -956,6 +966,9 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* opC
sizeSaved += old_value.size;
stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+ if (_shuttingDown)
+ break;
+
if (_cappedCallback) {
uassertStatusOK(_cappedCallback->aboutToDeleteCapped(
opCtx,
@@ -1091,14 +1104,23 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx) {
<< " records totaling to " << _dataSize.load() << " bytes";
}
+Status WiredTigerRecordStore::insertRecordsT(OperationContext* opCtx,
+ std::vector<Record>* records,
+ std::vector<Timestamp>* timestamps,
+ bool enforceQuota) {
+ return _insertRecords(opCtx, records->data(), timestamps->data(), records->size());
+}
+
Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
std::vector<Record>* records,
bool enforceQuota) {
- return _insertRecords(opCtx, records->data(), records->size());
+ invariant(false);
+ return Status::OK();
}
Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
Record* records,
+ Timestamp* timestamps,
size_t nRecords) {
// We are kind of cheating on capped collections since we write all of them at once ....
// Simplest way out would be to just block vector writes for everything except oplog ?
@@ -1115,20 +1137,24 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
WT_CURSOR* c = curwrap.get();
invariant(c);
+ if (_isOplog) {
+ // Register a change to notify the oplog journal flusher thread when this transaction
+ // finishes.
+ opCtx->recoveryUnit()->registerChange(new OplogInsertChange(_kvEngine->getOplogManager()));
+ }
+
RecordId highestId = RecordId();
dassert(nRecords != 0);
for (size_t i = 0; i < nRecords; i++) {
auto& record = records[i];
- if (_useOplogHack) {
+ if (_isOplog) {
StatusWith<RecordId> status =
oploghack::extractKey(record.data.data(), record.data.size());
if (!status.isOK())
return status.getStatus();
record.id = status.getValue();
} else if (_isCapped) {
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
record.id = _nextId();
- _addUncommittedRecordId_inlock(opCtx, record.id);
} else {
record.id = _nextId();
}
@@ -1136,14 +1162,21 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
highestId = record.id;
}
- if (_useOplogHack && (highestId > _oplog_highestSeen)) {
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- if (highestId > _oplog_highestSeen)
- _oplog_highestSeen = highestId;
- }
-
for (size_t i = 0; i < nRecords; i++) {
auto& record = records[i];
+ if (_isOplog) {
+ Timestamp ts;
+ if (timestamps[i].isNull()) {
+ // If the timestamp is 0, that probably means someone inserted a document directly
+ // into the oplog. In this case, use the RecordId as the timestamp, since they are
+ // one and the same.
+ ts = Timestamp(record.id.repr());
+ } else {
+ ts = timestamps[i];
+ }
+ LOG(4) << "inserting record into oplog with timestamp " << ts.asULL();
+ fassertStatusOK(39001, opCtx->recoveryUnit()->setTimestamp(SnapshotName(ts)));
+ }
setKey(c, record.id);
WiredTigerItem value(record.data.data(), record.data.size());
c->set_value(c, value.Get());
@@ -1165,55 +1198,47 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
return Status::OK();
}
+StatusWith<RecordId> WiredTigerRecordStore::insertRecordT(
+ OperationContext* opCtx, const char* data, int len, Timestamp timestamp, bool enforceQuota) {
+ Record record = {RecordId(), RecordData(data, len)};
+ Status status = _insertRecords(opCtx, &record, &timestamp, 1);
+ if (!status.isOK())
+ return StatusWith<RecordId>(status);
+ return StatusWith<RecordId>(record.id);
+}
+
StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* opCtx,
const char* data,
int len,
bool enforceQuota) {
Record record = {RecordId(), RecordData(data, len)};
- Status status = _insertRecords(opCtx, &record, 1);
+ Timestamp timestamp;
+ Status status = _insertRecords(opCtx, &record, &timestamp, 1);
if (!status.isOK())
return StatusWith<RecordId>(status);
return StatusWith<RecordId>(record.id);
}
-void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit) {
- invariant(it->isNormal());
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- if (didCommit && _isOplog && *it != _oplog_highestSeen) {
- // Defer removal from _uncommittedRecordIds until it is durable. We don't need to wait for
- // durability of ops that didn't commit because they won't become durable.
- // As an optimization, we only defer visibility until durable if new ops were created while
- // we were pending. This makes single-threaded w>1 workloads faster and is safe because
- // durability follows commit order for commits that are fully sequenced (B doesn't call
- // commit until after A's commit call returns).
- const bool wasEmpty = _opsWaitingForJournal.empty();
- _opsWaitingForJournal.push_back(it);
- if (wasEmpty) {
- _opsWaitingForJournalCV.notify_one();
- }
- } else {
- _uncommittedRecordIds.erase(it);
- _opsBecameVisibleCV.notify_all();
- }
+bool WiredTigerRecordStore::isOpHidden_forTest(const RecordId& id) const {
+ invariant(id.repr() > 0);
+ invariant(_kvEngine->getOplogManager());
+ return _kvEngine->getOplogManager()->getOplogReadTimestamp() <
+ static_cast<std::uint64_t>(id.repr());
}
-bool WiredTigerRecordStore::isCappedHidden(const RecordId& id) const {
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- if (_uncommittedRecordIds.empty()) {
- return false;
+void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() {
+ stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+ // This wakes up cursors blocking in await_data.
+ if (_cappedCallback) {
+ _cappedCallback->notifyCappedWaitersIfNeeded();
}
- return _uncommittedRecordIds.front() <= id;
}
-RecordId WiredTigerRecordStore::lowestCappedHiddenRecord() const {
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- return _uncommittedRecordIds.empty() ? RecordId() : _uncommittedRecordIds.front();
-}
-
-Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
- const DocWriter* const* docs,
- size_t nDocs,
- RecordId* idsOut) {
+Status WiredTigerRecordStore::insertRecordsWithDocWriterT(OperationContext* opCtx,
+ const DocWriter* const* docs,
+ Timestamp* timestamps,
+ size_t nDocs,
+ RecordId* idsOut) {
std::unique_ptr<Record[]> records(new Record[nDocs]);
// First get all the sizes so we can allocate a single buffer for all documents. Eventually it
@@ -1236,7 +1261,7 @@ Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx
}
invariant(pos == (buffer.get() + totalSize));
- Status s = _insertRecords(opCtx, records.get(), nDocs);
+ Status s = _insertRecords(opCtx, records.get(), timestamps, nDocs);
if (!s.isOK())
return s;
@@ -1326,15 +1351,6 @@ StatusWith<RecordData> WiredTigerRecordStore::updateWithDamages(
return RecordData(static_cast<const char*>(value.data), value.size).getOwned();
}
-void WiredTigerRecordStore::_oplogSetStartHack(WiredTigerRecoveryUnit* wru) const {
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- if (_uncommittedRecordIds.empty()) {
- wru->setOplogReadTill(_oplog_highestSeen);
- } else {
- wru->setOplogReadTill(_uncommittedRecordIds.front());
- }
-}
-
std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursor(
OperationContext* opCtx) const {
const char* extraConfig = "";
@@ -1505,113 +1521,20 @@ Status WiredTigerRecordStore::touch(OperationContext* opCtx, BSONObjBuilder* out
return Status(ErrorCodes::CommandNotSupported, "this storage engine does not support touch");
}
-Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* opCtx,
- const Timestamp& opTime) {
- StatusWith<RecordId> id = oploghack::keyForOptime(opTime);
- if (!id.isOK())
- return id.getStatus();
-
- stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- _addUncommittedRecordId_inlock(opCtx, id.getValue());
- return Status::OK();
-}
-
-class WiredTigerRecordStore::CappedInsertChange : public RecoveryUnit::Change {
-public:
- CappedInsertChange(WiredTigerRecordStore* rs, SortedRecordIds::iterator it)
- : _rs(rs), _it(it) {}
-
- virtual void commit() {
- _rs->_dealtWithCappedId(_it, true);
- // Do not notify here because all committed inserts notify, always.
- }
-
- virtual void rollback() {
- // Notify on rollback since it might make later commits visible.
- _rs->_dealtWithCappedId(_it, false);
- stdx::lock_guard<stdx::mutex> lk(_rs->_cappedCallbackMutex);
- if (_rs->_cappedCallback)
- _rs->_cappedCallback->notifyCappedWaitersIfNeeded();
- }
-
-private:
- WiredTigerRecordStore* const _rs;
- const SortedRecordIds::iterator _it;
-};
-
-void WiredTigerRecordStore::_oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache) try {
- Client::initThread("WTOplogJournalThread");
- while (true) {
- stdx::unique_lock<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- {
- MONGO_IDLE_THREAD_BLOCK;
- _opsWaitingForJournalCV.wait(
- lk, [&] { return _shuttingDown || !_opsWaitingForJournal.empty(); });
- }
-
- while (!_shuttingDown && MONGO_FAIL_POINT(WTPausePrimaryOplogDurabilityLoop)) {
- lk.unlock();
- sleepmillis(10);
- lk.lock();
- }
-
- if (_shuttingDown)
- return;
-
- decltype(_opsWaitingForJournal) opsAboutToBeJournaled = {};
- _opsWaitingForJournal.swap(opsAboutToBeJournaled);
-
- lk.unlock();
- const bool forceCheckpoint = false;
- const bool stableCheckpoint = false;
- sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint);
- lk.lock();
-
- for (auto&& op : opsAboutToBeJournaled) {
- _uncommittedRecordIds.erase(op);
- }
-
- _opsBecameVisibleCV.notify_all();
- lk.unlock();
-
- stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
- if (_cappedCallback) {
- _cappedCallback->notifyCappedWaitersIfNeeded();
- }
- }
-} catch (...) {
- std::terminate();
-}
-
void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const {
- invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork());
-
- // This function must not start a WT transaction, otherwise we will get stuck in an infinite
- // loop of WCE handling when the getCursor() is called.
-
- stdx::unique_lock<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- const auto waitingFor = _oplog_highestSeen;
- opCtx->waitForConditionOrInterrupt(_opsBecameVisibleCV, lk, [&] {
- return _uncommittedRecordIds.empty() || _uncommittedRecordIds.front() > waitingFor;
- });
-}
-
-void WiredTigerRecordStore::_addUncommittedRecordId_inlock(OperationContext* opCtx, RecordId id) {
- dassert(_uncommittedRecordIds.empty() || _uncommittedRecordIds.back() < id);
- SortedRecordIds::iterator it = _uncommittedRecordIds.insert(_uncommittedRecordIds.end(), id);
- invariant(it->isNormal());
- opCtx->recoveryUnit()->registerChange(new CappedInsertChange(this, it));
- _oplog_highestSeen = id;
+ auto oplogManager = _kvEngine->getOplogManager();
+ if (oplogManager) {
+ oplogManager->waitForAllEarlierOplogWritesToBeVisible(this, opCtx);
+ }
}
boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack(
OperationContext* opCtx, const RecordId& startingPosition) const {
- if (!_useOplogHack)
+ if (!_isOplog)
return boost::none;
- {
- WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(opCtx);
- _oplogSetStartHack(wru);
+ if (_isOplog) {
+ WiredTigerRecoveryUnit::get(opCtx)->setIsOplogReader();
}
WiredTigerCursor cursor(_uri, _tableId, true, opCtx);
@@ -1641,7 +1564,7 @@ void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx,
}
RecordId WiredTigerRecordStore::_nextId() {
- invariant(!_useOplogHack);
+ invariant(!_isOplog);
RecordId out = RecordId(_nextIdNum.fetchAndAdd(1));
invariant(out.isNormal());
return out;
@@ -1754,9 +1677,25 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* opCtx,
wuow.commit();
- if (_useOplogHack) {
- // Forget that we've ever seen a higher timestamp than we now have.
- _oplog_highestSeen = lastKeptId;
+ if (_isOplog) {
+ // Immediately rewind visibility to our truncation point, to prevent new
+ // transactions from appearing.
+ Timestamp truncTs(lastKeptId.repr());
+
+
+ char commitTSConfigString["commit_timestamp="_sd.size() +
+ (8 * 2) /* 8 hexadecimal characters */ + 1 /* trailing null */];
+ auto size = std::snprintf(commitTSConfigString,
+ sizeof(commitTSConfigString),
+ "commit_timestamp=%llx",
+ truncTs.asULL());
+
+ invariant(static_cast<std::size_t>(size) < sizeof(commitTSConfigString));
+ auto conn = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->conn();
+ invariantWTOK(conn->set_timestamp(conn, commitTSConfigString));
+
+ _kvEngine->getOplogManager()->setOplogReadTimestamp(truncTs);
+ LOG(1) << "truncation new read timestamp: " << truncTs;
}
if (_oplogStones) {
@@ -1765,15 +1704,20 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* opCtx,
}
}
+Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& opTime) {
+ // This starts a new transaction and gives it a timestamp.
+ // This is required for oplog visibility to work correctly, as WiredTiger uses the transaction
+ // list to determine where there are holes in the oplog.
+ return opCtx->recoveryUnit()->setTimestamp(SnapshotName(opTime));
+}
+
// Cursor Base:
WiredTigerRecordStoreCursorBase::WiredTigerRecordStoreCursorBase(OperationContext* opCtx,
const WiredTigerRecordStore& rs,
bool forward)
- : _rs(rs),
- _opCtx(opCtx),
- _forward(forward),
- _readUntilForOplog(WiredTigerRecoveryUnit::get(opCtx)->getOplogReadTill()) {
+ : _rs(rs), _opCtx(opCtx), _forward(forward) {
_cursor.emplace(rs.getURI(), rs.tableId(), true, opCtx);
}
@@ -1810,11 +1754,6 @@ boost::optional<Record> WiredTigerRecordStoreCursorBase::next() {
throw WriteConflictException();
}
- if (!isVisible(id)) {
- _eof = true;
- return {};
- }
-
WT_ITEM value;
invariantWTOK(c->get_value(c, &value));
@@ -1860,6 +1799,10 @@ void WiredTigerRecordStoreCursorBase::saveUnpositioned() {
}
bool WiredTigerRecordStoreCursorBase::restore() {
+ if (_rs._isOplog && _forward) {
+ WiredTigerRecoveryUnit::get(_opCtx)->setIsOplogReader();
+ }
+
if (!_cursor)
_cursor.emplace(_rs.getURI(), _rs.tableId(), true, _opCtx);
@@ -1920,32 +1863,13 @@ void WiredTigerRecordStoreCursorBase::reattachToOperationContext(OperationContex
// _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues.
}
-bool WiredTigerRecordStoreCursorBase::isVisible(const RecordId& id) {
- if (!_rs._isCapped)
- return true;
-
- if (!_forward)
- return true;
-
- if (_readUntilForOplog.isNull() || !_rs._isOplog) {
- // this is the normal capped case
- return !_rs.isCappedHidden(id);
- }
-
- // this is for oplogs
- if (id == _readUntilForOplog) {
- // we allow if its been committed already
- return !_rs.isCappedHidden(id);
- }
-
- return id < _readUntilForOplog;
-}
-
// Standard Implementations:
-StandardWiredTigerRecordStore::StandardWiredTigerRecordStore(OperationContext* opCtx, Params params)
- : WiredTigerRecordStore(opCtx, params) {}
+StandardWiredTigerRecordStore::StandardWiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
+ OperationContext* opCtx,
+ Params params)
+ : WiredTigerRecordStore(kvEngine, opCtx, params) {}
RecordId StandardWiredTigerRecordStore::getKey(WT_CURSOR* cursor) const {
std::int64_t recordId;
@@ -1972,7 +1896,7 @@ std::unique_ptr<SeekableRecordCursor> StandardWiredTigerRecordStore::getCursor(
!opCtx->lockState()->isCollectionLockedForMode(_ns, MODE_X)) {
throw WriteConflictException();
}
- _oplogSetStartHack(wru);
+ wru->setIsOplogReader();
}
return stdx::make_unique<WiredTigerRecordStoreStandardCursor>(opCtx, *this, forward);
@@ -2007,10 +1931,11 @@ bool WiredTigerRecordStoreStandardCursor::hasWrongPrefix(WT_CURSOR* cursor,
// Prefixed Implementations:
-PrefixedWiredTigerRecordStore::PrefixedWiredTigerRecordStore(OperationContext* opCtx,
+PrefixedWiredTigerRecordStore::PrefixedWiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
+ OperationContext* opCtx,
Params params,
KVPrefix prefix)
- : WiredTigerRecordStore(opCtx, params), _prefix(prefix) {}
+ : WiredTigerRecordStore(kvEngine, opCtx, params), _prefix(prefix) {}
std::unique_ptr<SeekableRecordCursor> PrefixedWiredTigerRecordStore::getCursor(
OperationContext* opCtx, bool forward) const {
@@ -2022,7 +1947,7 @@ std::unique_ptr<SeekableRecordCursor> PrefixedWiredTigerRecordStore::getCursor(
!opCtx->lockState()->isCollectionLockedForMode(_ns, MODE_X)) {
throw WriteConflictException();
}
- _oplogSetStartHack(wru);
+ wru->setIsOplogReader();
}
return stdx::make_unique<WiredTigerRecordStorePrefixedCursor>(opCtx, *this, _prefix, forward);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 7d3cfb683fc..95bf88ccd3e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -39,6 +39,7 @@
#include "mongo/db/storage/capped_callback.h"
#include "mongo/db/storage/kv/kv_prefix.h"
#include "mongo/db/storage/record_store.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
@@ -66,12 +67,10 @@ class WiredTigerSessionCache;
class WiredTigerSizeStorer;
extern const std::string kWiredTigerEngineName;
-typedef std::list<RecordId> SortedRecordIds;
class WiredTigerRecordStore : public RecordStore {
friend class WiredTigerRecordStoreCursorBase;
- // Only the `_isOplog` member? Move to protected?
friend class StandardWiredTigerRecordStore;
friend class PrefixedWiredTigerRecordStore;
@@ -113,7 +112,7 @@ public:
bool isReadOnly;
};
- WiredTigerRecordStore(OperationContext* opCtx, Params params);
+ WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, OperationContext* opCtx, Params params);
virtual ~WiredTigerRecordStore();
@@ -140,6 +139,14 @@ public:
virtual void deleteRecord(OperationContext* opCtx, const RecordId& id);
+ virtual Status insertRecordsT(OperationContext* opCtx,
+ std::vector<Record>* records,
+ std::vector<Timestamp>* timestamps,
+ bool enforceQuota);
+
+ virtual StatusWith<RecordId> insertRecordT(
+ OperationContext* opCtx, const char* data, int len, Timestamp timestamp, bool enforceQuota);
+
virtual Status insertRecords(OperationContext* opCtx,
std::vector<Record>* records,
bool enforceQuota);
@@ -149,10 +156,19 @@ public:
int len,
bool enforceQuota);
+ virtual Status insertRecordsWithDocWriterT(OperationContext* opCtx,
+ const DocWriter* const* docs,
+ Timestamp* timestamps,
+ size_t nDocs,
+ RecordId* idsOut);
+
virtual Status insertRecordsWithDocWriter(OperationContext* opCtx,
const DocWriter* const* docs,
size_t nDocs,
- RecordId* idsOut);
+ RecordId* idsOut) {
+ invariant(false);
+ };
+
virtual Status updateRecord(OperationContext* opCtx,
const RecordId& oldLocation,
@@ -225,13 +241,6 @@ public:
Status updateCappedSize(OperationContext* opCtx, long long cappedSize) final;
- bool isOplog() const {
- return _isOplog;
- }
- bool usingOplogHack() const {
- return _useOplogHack;
- }
-
void setCappedCallback(CappedCallback* cb) {
stdx::lock_guard<stdx::mutex> lk(_cappedCallbackMutex);
_cappedCallback = cb;
@@ -251,8 +260,7 @@ public:
_sizeStorer = ss;
}
- bool isCappedHidden(const RecordId& id) const;
- RecordId lowestCappedHiddenRecord() const;
+ bool isOpHidden_forTest(const RecordId& id) const;
bool inShutdown() const;
@@ -269,6 +277,8 @@ public:
// Returns false if the oplog was dropped while waiting for a deletion request.
bool yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx);
+ void notifyCappedWaitersIfNeeded();
+
class OplogStones;
// Exposed only for testing.
@@ -293,16 +303,16 @@ protected:
private:
class RandomCursor;
- class CappedInsertChange;
+ class OplogInsertChange;
class NumRecordsChange;
class DataSizeChange;
static WiredTigerRecoveryUnit* _getRecoveryUnit(OperationContext* opCtx);
- void _dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit);
- void _addUncommittedRecordId_inlock(OperationContext* opCtx, RecordId id);
-
- Status _insertRecords(OperationContext* opCtx, Record* records, size_t nRecords);
+ Status _insertRecords(OperationContext* opCtx,
+ Record* records,
+ Timestamp* timestamps,
+ size_t nRecords);
RecordId _nextId();
void _setId(RecordId id);
@@ -310,8 +320,7 @@ private:
void _changeNumRecords(OperationContext* opCtx, int64_t diff);
void _increaseDataSize(OperationContext* opCtx, int64_t amount);
RecordData _getData(const WiredTigerCursor& cursor) const;
- void _oplogSetStartHack(WiredTigerRecoveryUnit* wru) const;
- void _oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache);
+
const std::string _uri;
const uint64_t _tableId; // not persisted
@@ -331,18 +340,13 @@ private:
AtomicInt64 _cappedSleep;
AtomicInt64 _cappedSleepMS;
CappedCallback* _cappedCallback;
- stdx::mutex _cappedCallbackMutex; // guards _cappedCallback.
+ bool _shuttingDown;
+ stdx::mutex _cappedCallbackMutex; // guards _cappedCallback and _shuttingDown
// See comment in ::cappedDeleteAsNeeded
int _cappedDeleteCheckCount;
mutable stdx::timed_mutex _cappedDeleterMutex;
- const bool _useOplogHack;
-
- SortedRecordIds _uncommittedRecordIds;
- RecordId _oplog_highestSeen;
- mutable stdx::mutex _uncommittedRecordIdsMutex;
-
AtomicInt64 _nextIdNum;
AtomicInt64 _dataSize;
AtomicInt64 _numRecords;
@@ -350,22 +354,18 @@ private:
WiredTigerSizeStorer* _sizeStorer; // not owned, can be NULL
int _sizeStorerCounter;
- bool _shuttingDown;
+ WiredTigerKVEngine* _kvEngine; // not owned.
// Non-null if this record store is underlying the active oplog.
std::shared_ptr<OplogStones> _oplogStones;
-
- // These use the _uncommittedRecordIdsMutex and are only used when _isOplog is true.
- stdx::condition_variable _opsWaitingForJournalCV;
- mutable stdx::condition_variable _opsBecameVisibleCV;
- std::vector<SortedRecordIds::iterator> _opsWaitingForJournal;
- stdx::thread _oplogJournalThread;
};
class StandardWiredTigerRecordStore final : public WiredTigerRecordStore {
public:
- StandardWiredTigerRecordStore(OperationContext* opCtx, Params params);
+ StandardWiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
+ OperationContext* opCtx,
+ Params params);
virtual std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
bool forward) const override;
@@ -390,7 +390,10 @@ protected:
class PrefixedWiredTigerRecordStore final : public WiredTigerRecordStore {
public:
- PrefixedWiredTigerRecordStore(OperationContext* opCtx, Params params, KVPrefix prefix);
+ PrefixedWiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
+ OperationContext* opCtx,
+ Params params,
+ KVPrefix prefix);
virtual std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
bool forward) const override;
@@ -466,7 +469,6 @@ protected:
boost::optional<WiredTigerCursor> _cursor;
bool _eof = false;
RecordId _lastReturnedId; // If null, need to seek to first/last record.
- const RecordId _readUntilForOplog;
private:
bool isVisible(const RecordId& id);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 75b5c11e673..5261cfc73ba 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -45,7 +45,6 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
-#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/stdx/memory.h"
@@ -222,179 +221,6 @@ StatusWith<RecordId> insertBSON(ServiceContext::UniqueOperationContext& opCtx,
return res;
}
-// TODO make generic
-TEST(WiredTigerRecordStoreTest, OplogHack) {
- std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper();
- // Use a large enough cappedMaxSize so that the limit is not reached by doing the inserts within
- // the test itself.
- const int64_t cappedMaxSize = 10 * 1024; // 10KB
- unique_ptr<RecordStore> rs(
- harnessHelper->newCappedRecordStore("local.oplog.foo", cappedMaxSize, -1));
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
-
- // always illegal
- ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(2, -1)).getStatus(), ErrorCodes::BadValue);
-
- {
- BSONObj obj = BSON("not_ts" << Timestamp(2, 1));
- ASSERT_EQ(
- rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false).getStatus(),
- ErrorCodes::BadValue);
-
- obj = BSON("ts"
- << "not a Timestamp");
- ASSERT_EQ(
- rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false).getStatus(),
- ErrorCodes::BadValue);
- }
-
- // currently dasserts
- // ASSERT_EQ(insertBSON(opCtx, rs, BSON("ts" << Timestamp(-2,1))).getStatus(),
- // ErrorCodes::BadValue);
-
- // success cases
- ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(1, 1)).getValue(), RecordId(1, 1));
-
- ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(1, 2)).getValue(), RecordId(1, 2));
-
- ASSERT_EQ(insertBSON(opCtx, rs, Timestamp(2, 2)).getValue(), RecordId(2, 2));
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- // find start
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), RecordId()); // nothing <=
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 1)), RecordId(1, 2)); // between
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 2)), RecordId(2, 2)); // ==
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2)); // > highest
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- rs->cappedTruncateAfter(opCtx.get(), RecordId(2, 2), false); // no-op
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(2, 2));
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- rs->cappedTruncateAfter(opCtx.get(), RecordId(1, 2), false); // deletes 2,2
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 2));
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- rs->cappedTruncateAfter(opCtx.get(), RecordId(1, 2), true); // deletes 1,2
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId(1, 1));
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- WriteUnitOfWork wuow(opCtx.get());
- ASSERT_OK(rs->truncate(opCtx.get())); // deletes 1,1 and leaves collection empty
- wuow.commit();
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(2, 3)), RecordId());
- }
-}
-
-TEST(WiredTigerRecordStoreTest, OplogHackOnNonOplog) {
- std::unique_ptr<RecordStoreHarnessHelper> harnessHelper = newRecordStoreHarnessHelper();
- unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore("local.NOT_oplog.foo"));
-
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
-
- BSONObj obj = BSON("ts" << Timestamp(2, -1));
- {
- WriteUnitOfWork wuow(opCtx.get());
- ASSERT_OK(rs->insertRecord(opCtx.get(), obj.objdata(), obj.objsize(), false).getStatus());
- wuow.commit();
- }
- ASSERT_EQ(rs->oplogStartHack(opCtx.get(), RecordId(0, 1)), boost::none);
-}
-
-TEST(WiredTigerRecordStoreTest, CappedOrder) {
- unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("a.b", 100000, 10000));
-
- RecordId id1;
-
- { // first insert a document
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- {
- WriteUnitOfWork uow(opCtx.get());
- StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), "a", 2, false);
- ASSERT_OK(res.getStatus());
- id1 = res.getValue();
- uow.commit();
- }
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(!cursor->next());
- }
-
- {
- // now we insert 2 docs, but commit the 2nd one fiirst
- // we make sure we can't find the 2nd until the first is commited
- ServiceContext::UniqueOperationContext t1(harnessHelper->newOperationContext());
- unique_ptr<WriteUnitOfWork> w1(new WriteUnitOfWork(t1.get()));
- rs->insertRecord(t1.get(), "b", 2, false).status_with_transitional_ignore();
- // do not commit yet
-
- { // create 2nd doc
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto t2 = harnessHelper->newOperationContext(client2.get());
- {
- WriteUnitOfWork w2(t2.get());
- rs->insertRecord(t2.get(), "c", 2, false).status_with_transitional_ignore();
- w2.commit();
- }
- }
-
- { // state should be the same
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto opCtx = harnessHelper->newOperationContext(client2.get());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(!cursor->next());
- }
-
- w1->commit();
- }
-
- { // now all 3 docs should be visible
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto opCtx = harnessHelper->newOperationContext(client2.get());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(cursor->next());
- ASSERT(cursor->next());
- ASSERT(!cursor->next());
- }
-}
-
TEST(WiredTigerRecordStoreTest, CappedCursorRollover) {
unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("a.b", 10000, 5));
@@ -438,8 +264,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx,
const unique_ptr<RecordStore>& rs,
int inc) {
Timestamp opTime = Timestamp(5, inc);
- WiredTigerRecordStore* wrs = checked_cast<WiredTigerRecordStore*>(rs.get());
- Status status = wrs->oplogDiskLocRegister(opCtx, opTime);
+ Status status = rs->oplogDiskLocRegister(opCtx, opTime);
ASSERT_OK(status);
BSONObj obj = BSON("ts" << opTime);
StatusWith<RecordId> res = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), false);
@@ -447,149 +272,6 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx,
return res.getValue();
}
-TEST(WiredTigerRecordStoreTest, OplogOrder) {
- unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1));
-
- {
- const WiredTigerRecordStore* wrs = checked_cast<WiredTigerRecordStore*>(rs.get());
- ASSERT(wrs->isOplog());
- ASSERT(wrs->usingOplogHack());
- }
-
- RecordId id1;
-
- { // first insert a document
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- {
- WriteUnitOfWork uow(opCtx.get());
- id1 = _oplogOrderInsertOplog(opCtx.get(), rs, 1);
- uow.commit();
- }
- }
-
- {
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(!cursor->next());
- }
-
- {
- // now we insert 2 docs, but commit the 2nd one first.
- // we make sure we can't find the 2nd until the first is commited.
- ServiceContext::UniqueOperationContext earlyReader(harnessHelper->newOperationContext());
- auto earlyCursor = rs->getCursor(earlyReader.get());
- ASSERT_EQ(earlyCursor->seekExact(id1)->id, id1);
- earlyCursor->save();
- earlyReader->recoveryUnit()->abandonSnapshot();
-
- auto client1 = harnessHelper->serviceContext()->makeClient("c1");
- auto t1 = harnessHelper->newOperationContext(client1.get());
- WriteUnitOfWork w1(t1.get());
- _oplogOrderInsertOplog(t1.get(), rs, 20);
- // do not commit yet
-
- { // create 2nd doc
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto t2 = harnessHelper->newOperationContext(client2.get());
- {
- WriteUnitOfWork w2(t2.get());
- _oplogOrderInsertOplog(t2.get(), rs, 30);
- w2.commit();
- }
- }
-
- { // Other operations should not be able to see 2nd doc until w1 commits.
- earlyCursor->restore();
- ASSERT(!earlyCursor->next());
-
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto opCtx = harnessHelper->newOperationContext(client2.get());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(!cursor->next());
- }
-
- w1.commit();
- }
-
- rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
-
- { // now all 3 docs should be visible
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto opCtx = harnessHelper->newOperationContext(client2.get());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(cursor->next());
- ASSERT(cursor->next());
- ASSERT(!cursor->next());
- }
-
- // Rollback the last two oplog entries, then insert entries with older optimes and ensure that
- // the visibility rules aren't violated. See SERVER-21645
- {
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto opCtx = harnessHelper->newOperationContext(client2.get());
- rs->cappedTruncateAfter(opCtx.get(), id1, /*inclusive*/ false);
- }
-
- {
- // Now we insert 2 docs with timestamps earlier than before, but commit the 2nd one first.
- // We make sure we can't find the 2nd until the first is commited.
- ServiceContext::UniqueOperationContext earlyReader(harnessHelper->newOperationContext());
- auto earlyCursor = rs->getCursor(earlyReader.get());
- ASSERT_EQ(earlyCursor->seekExact(id1)->id, id1);
- earlyCursor->save();
- earlyReader->recoveryUnit()->abandonSnapshot();
-
- auto client1 = harnessHelper->serviceContext()->makeClient("c1");
- auto t1 = harnessHelper->newOperationContext(client1.get());
- WriteUnitOfWork w1(t1.get());
- _oplogOrderInsertOplog(t1.get(), rs, 2);
- // do not commit yet
-
- { // create 2nd doc
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto t2 = harnessHelper->newOperationContext(client2.get());
- {
- WriteUnitOfWork w2(t2.get());
- _oplogOrderInsertOplog(t2.get(), rs, 3);
- w2.commit();
- }
- }
-
- { // Other operations should not be able to see 2nd doc until w1 commits.
- ASSERT(earlyCursor->restore());
- ASSERT(!earlyCursor->next());
-
- auto client2 = harnessHelper->serviceContext()->makeClient("c2");
- auto opCtx = harnessHelper->newOperationContext(client2.get());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(!cursor->next());
- }
-
- w1.commit();
- }
-
- rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
-
- { // now all 3 docs should be visible
- ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
- auto cursor = rs->getCursor(opCtx.get());
- auto record = cursor->seekExact(id1);
- ASSERT_EQ(id1, record->id);
- ASSERT(cursor->next());
- ASSERT(cursor->next());
- ASSERT(!cursor->next());
- }
-}
-
// Test that even when the oplog durability loop is paused, we can still advance the commit point as
// long as the commit for each insert comes before the next insert starts.
TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityInOrder) {
@@ -597,25 +279,25 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityInOrder) {
WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::alwaysOn);
unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1));
+ unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.rs", 100000, -1));
auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
WriteUnitOfWork uow(opCtx.get());
RecordId id = _oplogOrderInsertOplog(opCtx.get(), rs, 1);
- ASSERT(wtrs->isCappedHidden(id));
+ ASSERT(wtrs->isOpHidden_forTest(id));
uow.commit();
- ASSERT(!wtrs->isCappedHidden(id));
+ ASSERT(wtrs->isOpHidden_forTest(id));
}
{
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
WriteUnitOfWork uow(opCtx.get());
RecordId id = _oplogOrderInsertOplog(opCtx.get(), rs, 2);
- ASSERT(wtrs->isCappedHidden(id));
+ ASSERT(wtrs->isOpHidden_forTest(id));
uow.commit();
- ASSERT(!wtrs->isCappedHidden(id));
+ ASSERT(wtrs->isOpHidden_forTest(id));
}
}
@@ -626,14 +308,14 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityOutOfOrder) {
WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::alwaysOn);
unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
- unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1));
+ unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.rs", 100000, -1));
auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
ServiceContext::UniqueOperationContext longLivedOp(harnessHelper->newOperationContext());
WriteUnitOfWork uow(longLivedOp.get());
RecordId id1 = _oplogOrderInsertOplog(longLivedOp.get(), rs, 1);
- ASSERT(wtrs->isCappedHidden(id1));
+ ASSERT(wtrs->isOpHidden_forTest(id1));
RecordId id2;
@@ -643,29 +325,29 @@ TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityOutOfOrder) {
harnessHelper->newOperationContext(innerClient.get()));
WriteUnitOfWork uow(opCtx.get());
id2 = _oplogOrderInsertOplog(opCtx.get(), rs, 2);
- ASSERT(wtrs->isCappedHidden(id2));
+ ASSERT(wtrs->isOpHidden_forTest(id2));
uow.commit();
}
- ASSERT(wtrs->isCappedHidden(id1));
- ASSERT(wtrs->isCappedHidden(id2));
+ ASSERT(wtrs->isOpHidden_forTest(id1));
+ ASSERT(wtrs->isOpHidden_forTest(id2));
uow.commit();
- ASSERT(wtrs->isCappedHidden(id1));
- ASSERT(wtrs->isCappedHidden(id2));
+ ASSERT(wtrs->isOpHidden_forTest(id1));
+ ASSERT(wtrs->isOpHidden_forTest(id2));
// Wait a bit and check again to make sure they don't become visible automatically.
sleepsecs(1);
- ASSERT(wtrs->isCappedHidden(id1));
- ASSERT(wtrs->isCappedHidden(id2));
+ ASSERT(wtrs->isOpHidden_forTest(id1));
+ ASSERT(wtrs->isOpHidden_forTest(id2));
WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off);
rs->waitForAllEarlierOplogWritesToBeVisible(longLivedOp.get());
- ASSERT(!wtrs->isCappedHidden(id1));
- ASSERT(!wtrs->isCappedHidden(id2));
+ ASSERT(!wtrs->isOpHidden_forTest(id1));
+ ASSERT(!wtrs->isOpHidden_forTest(id2));
}
TEST(WiredTigerRecordStoreTest, AppendCustomStatsMetadata) {
@@ -963,6 +645,9 @@ TEST(WiredTigerRecordStoreTest, OplogStones_CappedTruncateAfter) {
ASSERT_EQ(300, oplogStones->currentBytes());
}
+ // Make sure all are visible.
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
// Truncate data using an inclusive RecordId that exists inside the stone currently being
// filled.
{
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 72bad9e184c..f3a66ada681 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -34,17 +34,12 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
-#include "mongo/base/init.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/stacktrace.h"
namespace mongo {
namespace {
@@ -159,7 +154,7 @@ void WiredTigerRecoveryUnit::assertInActiveTxn() const {
WiredTigerSession* WiredTigerRecoveryUnit::getSession(OperationContext* opCtx) {
if (!_active) {
- _txnOpen(opCtx);
+ _txnOpen();
}
return _session.get();
}
@@ -183,10 +178,6 @@ void* WiredTigerRecoveryUnit::writingPtr(void* data, size_t len) {
MONGO_UNREACHABLE;
}
-void WiredTigerRecoveryUnit::setOplogReadTill(const RecordId& id) {
- _oplogReadTill = id;
-}
-
void WiredTigerRecoveryUnit::_txnClose(bool commit) {
invariant(_active);
WT_SESSION* s = _session->getSession();
@@ -207,7 +198,7 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
}
_active = false;
_mySnapshotId = nextSnapshotId.fetchAndAdd(1);
- _oplogReadTill = RecordId();
+ _isOplogReader = false;
}
SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const {
@@ -233,7 +224,7 @@ boost::optional<SnapshotName> WiredTigerRecoveryUnit::getMajorityCommittedSnapsh
return _majorityCommittedSnapshot;
}
-void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) {
+void WiredTigerRecoveryUnit::_txnOpen() {
invariant(!_active);
_ensureSession();
@@ -241,19 +232,43 @@ void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) {
if (shouldLog(kSlowTransactionSeverity)) {
_timer.reset(new Timer());
}
- WT_SESSION* s = _session->getSession();
+ WT_SESSION* session = _session->getSession();
if (_readFromMajorityCommittedSnapshot) {
_majorityCommittedSnapshot =
- _sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(s);
+ _sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(session);
+ } else if (_isOplogReader) {
+ _sessionCache->snapshotManager().beginTransactionOnOplog(
+ _sessionCache->getKVEngine()->getOplogManager(), session);
} else {
- invariantWTOK(s->begin_transaction(s, NULL));
+ invariantWTOK(session->begin_transaction(session, NULL));
}
LOG(3) << "WT begin_transaction for snapshot id " << _mySnapshotId;
_active = true;
}
+
+Status WiredTigerRecoveryUnit::setTimestamp(SnapshotName timestamp) {
+ _ensureSession();
+ LOG(3) << "WT set timestamp of future write operations to " << timestamp;
+ WT_SESSION* session = _session->getSession();
+ invariant(_inUnitOfWork);
+
+ // Starts the WT transaction associated with this session.
+ getSession(nullptr);
+
+ const std::string conf = str::stream() << "commit_timestamp=" << timestamp.toString();
+ auto rc = session->timestamp_transaction(session, conf.c_str());
+ return wtRCToStatus(rc, "timestamp_transaction");
+}
+
+void WiredTigerRecoveryUnit::setIsOplogReader() {
+ // Note: it would be nice to assert !active here, but OplogStones currently opens a cursor on
+ // the oplog while the recovery unit is already active.
+ _isOplogReader = true;
+}
+
// ---------------------
WiredTigerCursor::WiredTigerCursor(const std::string& uri,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index a3c1df67773..b9583bf8c79 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -1,5 +1,3 @@
-// wiredtiger_recovery_unit.h
-
/**
* Copyright (C) 2014 MongoDB Inc.
*
@@ -32,19 +30,17 @@
#include <wiredtiger.h>
-#include <memory.h>
-
+#include <boost/optional.hpp>
+#include <cstdint>
#include <memory>
#include <vector>
#include "mongo/base/checked_cast.h"
-#include "mongo/base/owned_pointer_vector.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/storage/snapshot_name.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
-#include "mongo/util/concurrency/ticketholder.h"
#include "mongo/util/timer.h"
namespace mongo {
@@ -54,35 +50,37 @@ class BSONObjBuilder;
class WiredTigerRecoveryUnit final : public RecoveryUnit {
public:
WiredTigerRecoveryUnit(WiredTigerSessionCache* sc);
+ ~WiredTigerRecoveryUnit();
- virtual ~WiredTigerRecoveryUnit();
+ void beginUnitOfWork(OperationContext* opCtx) override;
+ void commitUnitOfWork() override;
+ void abortUnitOfWork() override;
- void beginUnitOfWork(OperationContext* opCtx) final;
- void commitUnitOfWork() final;
- void abortUnitOfWork() final;
+ bool waitUntilDurable() override;
- virtual bool waitUntilDurable();
+ void registerChange(Change* change) override;
- virtual void registerChange(Change* change);
+ void abandonSnapshot() override;
- virtual void abandonSnapshot();
+ Status setReadFromMajorityCommittedSnapshot() override;
+ bool isReadingFromMajorityCommittedSnapshot() const override {
+ return _readFromMajorityCommittedSnapshot;
+ }
- virtual void* writingPtr(void* data, size_t len);
+ boost::optional<SnapshotName> getMajorityCommittedSnapshot() const override;
- virtual void setRollbackWritesDisabled() {}
+ SnapshotId getSnapshotId() const override;
- virtual SnapshotId getSnapshotId() const;
+ Status setTimestamp(SnapshotName timestamp) override;
- Status setReadFromMajorityCommittedSnapshot() final;
- bool isReadingFromMajorityCommittedSnapshot() const final {
- return _readFromMajorityCommittedSnapshot;
- }
+ void* writingPtr(void* data, size_t len) override;
- boost::optional<SnapshotName> getMajorityCommittedSnapshot() const final;
+ void setRollbackWritesDisabled() override {}
// ---- WT STUFF
WiredTigerSession* getSession(OperationContext* opCtx);
+ void setIsOplogReader();
/**
* Returns a session without starting a new WT txn on the session. Will not close any already
@@ -99,11 +97,6 @@ public:
}
void assertInActiveTxn() const;
- void setOplogReadTill(const RecordId& id);
- RecordId getOplogReadTill() const {
- return _oplogReadTill;
- }
-
static WiredTigerRecoveryUnit* get(OperationContext* opCtx) {
return checked_cast<WiredTigerRecoveryUnit*>(opCtx->recoveryUnit());
}
@@ -124,7 +117,9 @@ private:
void _ensureSession();
void _txnClose(bool commit);
- void _txnOpen(OperationContext* opCtx);
+ void _txnOpen();
+
+ char* _getOplogReaderConfigString();
WiredTigerSessionCache* _sessionCache; // not owned
UniqueWiredTigerSession _session;
@@ -132,11 +127,10 @@ private:
bool _inUnitOfWork;
bool _active;
uint64_t _mySnapshotId;
- RecordId _oplogReadTill;
bool _readFromMajorityCommittedSnapshot = false;
SnapshotName _majorityCommittedSnapshot = SnapshotName::min();
std::unique_ptr<Timer> _timer;
-
+ bool _isOplogReader = false;
typedef std::vector<std::unique_ptr<Change>> Changes;
Changes _changes;
};
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index c796b3b7407..fdffe30c5e2 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -389,4 +389,5 @@ void WiredTigerSessionCache::WiredTigerSessionDeleter::operator()(
WiredTigerSession* session) const {
session->_cache->releaseSession(session);
}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index 60622792519..80fa93016c7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -210,6 +210,10 @@ public:
return _cursorEpoch.load();
}
+ WiredTigerKVEngine* getKVEngine() const {
+ return _engine;
+ }
+
private:
WiredTigerKVEngine* _engine; // not owned, might be NULL
WT_CONNECTION* _conn; // not owned
@@ -235,10 +239,10 @@ private:
AtomicUInt32 _lastSyncTime;
stdx::mutex _lastSyncMutex;
- // Notified when we commit to the journal.
- JournalListener* _journalListener = &NoOpJournalListener::instance;
// Protects _journalListener.
stdx::mutex _journalListenerMutex;
+ // Notified when we commit to the journal.
+ JournalListener* _journalListener = &NoOpJournalListener::instance;
/**
* Returns a session to the cache for later reuse. If closeAll was called between getting this
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp
index ea42dac59d0..cd58ea18a13 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.cpp
@@ -31,7 +31,10 @@
#include "mongo/platform/basic.h"
+#include <algorithm>
+
#include "mongo/base/checked_cast.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
@@ -53,11 +56,20 @@ Status WiredTigerSnapshotManager::createSnapshot(OperationContext* opCtx,
return wtRCToStatus(session->snapshot(session, config.c_str()));
}
-void WiredTigerSnapshotManager::setCommittedSnapshot(const SnapshotName& name) {
+void WiredTigerSnapshotManager::setCommittedSnapshot(const SnapshotName& name, Timestamp ts) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(!_committedSnapshot || *_committedSnapshot <= name);
_committedSnapshot = name;
+
+ char oldestTSConfigString["oldest_timestamp="_sd.size() + (8 * 2) /* 16 hexadecimal digits */ +
+ 1 /* trailing null */];
+ auto size = std::snprintf(
+ oldestTSConfigString, sizeof(oldestTSConfigString), "oldest_timestamp=%llx", ts.asULL());
+ invariant(static_cast<std::size_t>(size) < sizeof(oldestTSConfigString));
+ invariantWTOK(_conn->set_timestamp(_conn, oldestTSConfigString));
+ _oldestKeptTimestamp = ts;
+ LOG(2) << "oldest_timestamp set to " << oldestTSConfigString;
}
void WiredTigerSnapshotManager::cleanupUnneededSnapshots() {
@@ -74,6 +86,7 @@ void WiredTigerSnapshotManager::cleanupUnneededSnapshots() {
void WiredTigerSnapshotManager::dropAllSnapshots() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_committedSnapshot = boost::none;
+
invariantWTOK(_session->snapshot(_session, "drop=(all)"));
}
@@ -106,4 +119,23 @@ SnapshotName WiredTigerSnapshotManager::beginTransactionOnCommittedSnapshot(
return *_committedSnapshot;
}
+void WiredTigerSnapshotManager::beginTransactionOnOplog(WiredTigerOplogManager* oplogManager,
+ WT_SESSION* session) const {
+ auto allCommittedTimestamp = oplogManager->getOplogReadTimestamp();
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ // Choose a read timestamp that is >= oldest_timestamp, but <= all_committed.
+ // Using "unsigned long long" here to ensure that the snprintf works with %llx on all
+ // platforms.
+ unsigned long long readTimestamp =
+ std::max<std::uint64_t>(allCommittedTimestamp, _oldestKeptTimestamp.asULL());
+
+ char readTSConfigString[15 /* read_timestamp= */ + (8 * 2) /* 16 hexadecimal digits */ +
+ 1 /* trailing null */];
+ auto size = std::snprintf(
+ readTSConfigString, sizeof(readTSConfigString), "read_timestamp=%llx", readTimestamp);
+ invariant(static_cast<std::size_t>(size) < sizeof(readTSConfigString));
+ invariantWTOK(session->begin_transaction(session, readTSConfigString));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h
index d885df0c863..96e574402b0 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_snapshot_manager.h
@@ -39,12 +39,15 @@
namespace mongo {
+class WiredTigerOplogManager;
+
class WiredTigerSnapshotManager final : public SnapshotManager {
MONGO_DISALLOW_COPYING(WiredTigerSnapshotManager);
public:
explicit WiredTigerSnapshotManager(WT_CONNECTION* conn) {
invariantWTOK(conn->open_session(conn, NULL, NULL, &_session));
+ _conn = conn;
}
~WiredTigerSnapshotManager() {
@@ -52,8 +55,8 @@ public:
}
Status prepareForCreateSnapshot(OperationContext* opCtx) final;
- Status createSnapshot(OperationContext* ru, const SnapshotName& name) final;
- void setCommittedSnapshot(const SnapshotName& name) final;
+ Status createSnapshot(OperationContext* opCtx, const SnapshotName& name) final;
+ void setCommittedSnapshot(const SnapshotName& name, Timestamp ts) final;
void cleanupUnneededSnapshots() final;
void dropAllSnapshots() final;
@@ -74,6 +77,11 @@ public:
SnapshotName beginTransactionOnCommittedSnapshot(WT_SESSION* session) const;
/**
+ * Starts a transaction on the oplog using an appropriate timestamp for oplog visiblity.
+ */
+ void beginTransactionOnOplog(WiredTigerOplogManager* oplogManager, WT_SESSION* session) const;
+
+ /**
* Returns lowest SnapshotName that could possibly be used by a future call to
* beginTransactionOnCommittedSnapshot, or boost::none if there is currently no committed
* snapshot.
@@ -86,6 +94,9 @@ public:
private:
mutable stdx::mutex _mutex; // Guards all members.
boost::optional<SnapshotName> _committedSnapshot;
- WT_SESSION* _session; // only used for dropping snapshots.
+ Timestamp _oldestKeptTimestamp; // The timestamp communicated to WiredTiger before which no
+ // timestamp history is preserved.
+ WT_SESSION* _session;
+ WT_CONNECTION* _conn;
};
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
index c391836685f..a1576eb7526 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
@@ -40,8 +40,10 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/json.h"
#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/storage/kv/kv_engine_test_harness.h"
#include "mongo/db/storage/kv/kv_prefix.h"
#include "mongo/db/storage/record_store_test_harness.h"
+#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
@@ -51,6 +53,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
@@ -63,42 +66,32 @@ using std::stringstream;
class WiredTigerHarnessHelper final : public RecordStoreHarnessHelper {
public:
- static WT_CONNECTION* createConnection(StringData dbpath, StringData extraStrings) {
- WT_CONNECTION* conn = NULL;
-
- std::stringstream ss;
- ss << "create,";
- ss << "statistics=(all),";
- ss << extraStrings;
- string config = ss.str();
- int ret = wiredtiger_open(dbpath.toString().c_str(), NULL, config.c_str(), &conn);
- ASSERT_OK(wtRCToStatus(ret));
- ASSERT(conn);
-
- return conn;
- }
-
WiredTigerHarnessHelper()
: _dbpath("wt_test"),
- _conn(createConnection(_dbpath.path(), "")),
- _sessionCache(new WiredTigerSessionCache(_conn)) {}
+ _engine(kWiredTigerEngineName, _dbpath.path(), &_cs, "", 1, false, false, false, false) {}
WiredTigerHarnessHelper(StringData extraStrings)
: _dbpath("wt_test"),
- _conn(createConnection(_dbpath.path(), extraStrings)),
- _sessionCache(new WiredTigerSessionCache(_conn)) {}
+ _engine(kWiredTigerEngineName,
+ _dbpath.path(),
+ &_cs,
+ extraStrings.toString(),
+ 1,
+ false,
+ false,
+ false,
+ false) {}
- ~WiredTigerHarnessHelper() {
- delete _sessionCache;
- _conn->close(_conn, NULL);
- }
+
+ ~WiredTigerHarnessHelper() {}
virtual std::unique_ptr<RecordStore> newNonCappedRecordStore() {
return newNonCappedRecordStore("a.b");
}
virtual std::unique_ptr<RecordStore> newNonCappedRecordStore(const std::string& ns) {
- WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit(_sessionCache);
+ WiredTigerRecoveryUnit* ru =
+ dynamic_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit());
OperationContextNoop opCtx(ru);
string uri = "table:" + ns;
@@ -126,7 +119,7 @@ public:
params.cappedCallback = nullptr;
params.sizeStorer = nullptr;
- auto ret = stdx::make_unique<StandardWiredTigerRecordStore>(&opCtx, params);
+ auto ret = stdx::make_unique<StandardWiredTigerRecordStore>(&_engine, &opCtx, params);
ret->postConstructorInit(&opCtx);
return std::move(ret);
}
@@ -139,7 +132,8 @@ public:
virtual std::unique_ptr<RecordStore> newCappedRecordStore(const std::string& ns,
int64_t cappedMaxSize,
int64_t cappedMaxDocs) {
- WiredTigerRecoveryUnit* ru = new WiredTigerRecoveryUnit(_sessionCache);
+ WiredTigerRecoveryUnit* ru =
+ dynamic_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit());
OperationContextNoop opCtx(ru);
string uri = "table:a.b";
@@ -170,27 +164,28 @@ public:
params.cappedCallback = nullptr;
params.sizeStorer = nullptr;
- auto ret = stdx::make_unique<StandardWiredTigerRecordStore>(&opCtx, params);
+ auto ret = stdx::make_unique<StandardWiredTigerRecordStore>(&_engine, &opCtx, params);
ret->postConstructorInit(&opCtx);
return std::move(ret);
}
virtual std::unique_ptr<RecoveryUnit> newRecoveryUnit() final {
- return stdx::make_unique<WiredTigerRecoveryUnit>(_sessionCache);
+ return std::unique_ptr<RecoveryUnit>(_engine.newRecoveryUnit());
}
virtual bool supportsDocLocking() final {
return true;
}
- virtual WT_CONNECTION* conn() const {
- return _conn;
+ virtual WT_CONNECTION* conn() {
+ return _engine.getConnection();
}
private:
unittest::TempDir _dbpath;
- WT_CONNECTION* _conn;
- WiredTigerSessionCache* _sessionCache;
+ ClockSourceMock _cs;
+
+ WiredTigerKVEngine _engine;
};
std::unique_ptr<HarnessHelper> makeHarnessHelper() {
@@ -262,7 +257,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
params.cappedCallback = nullptr;
params.sizeStorer = &ss;
- auto ret = new StandardWiredTigerRecordStore(opCtx.get(), params);
+ auto ret = new StandardWiredTigerRecordStore(nullptr, opCtx.get(), params);
ret->postConstructorInit(opCtx.get());
rs.reset(ret);
}
@@ -433,7 +428,7 @@ TEST_F(SizeStorerValidateTest, InvalidSizeStorerAtCreation) {
params.cappedCallback = nullptr;
params.sizeStorer = sizeStorer.get();
- auto ret = new StandardWiredTigerRecordStore(opCtx.get(), params);
+ auto ret = new StandardWiredTigerRecordStore(nullptr, opCtx.get(), params);
ret->postConstructorInit(opCtx.get());
rs.reset(ret);
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index 6290b31f860..302ddb28b5b 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
@@ -211,17 +212,19 @@ protected:
}
// These deletes don't get logged.
void deleteAll(const char* ns) const {
- Lock::GlobalWrite lk(&_opCtx);
- OldClientContext ctx(&_opCtx, ns);
- WriteUnitOfWork wunit(&_opCtx);
- Database* db = ctx.db();
- Collection* coll = db->getCollection(&_opCtx, ns);
- if (!coll) {
- coll = db->createCollection(&_opCtx, ns);
- }
+ ::mongo::writeConflictRetry(&_opCtx, "deleteAll", ns, [&] {
+ Lock::GlobalWrite lk(&_opCtx);
+ OldClientContext ctx(&_opCtx, ns);
+ WriteUnitOfWork wunit(&_opCtx);
+ Database* db = ctx.db();
+ Collection* coll = db->getCollection(&_opCtx, ns);
+ if (!coll) {
+ coll = db->createCollection(&_opCtx, ns);
+ }
- ASSERT_OK(coll->truncate(&_opCtx));
- wunit.commit();
+ ASSERT_OK(coll->truncate(&_opCtx));
+ wunit.commit();
+ });
}
void insert(const BSONObj& o) const {
Lock::GlobalWrite lk(&_opCtx);
diff --git a/src/mongo/unittest/temp_dir.h b/src/mongo/unittest/temp_dir.h
index a3dd3fcc2d2..6ea1d1ed4fd 100644
--- a/src/mongo/unittest/temp_dir.h
+++ b/src/mongo/unittest/temp_dir.h
@@ -26,6 +26,8 @@
* then also delete it in the license file.
*/
+#pragma once
+
#include <string>
#include "mongo/base/disallow_copying.h"