summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2017-09-19 11:27:59 -0400
committerEric Milkie <milkie@10gen.com>2017-09-25 13:17:16 -0400
commit8c118d741327e6e3fe29955f7f75f6fe853297e5 (patch)
treeb215e7de950e487f67fbd8f98fe27437d90a70d3
parent62fe0f89e9fa15c69f912d31b822b3631f691fe7 (diff)
downloadmongo-8c118d741327e6e3fe29955f7f75f6fe853297e5.tar.gz
SERVER-31086 adorn secondary inserts and vector inserts with timestamps
-rw-r--r--jstests/replsets/oplog_replay_on_startup.js4
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/commands/feature_compatibility_version.cpp13
-rw-r--r--src/mongo/db/repl/apply_ops.cpp20
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp6
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp7
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp34
-rw-r--r--src/mongo/db/repl/oplog.cpp90
-rw-r--r--src/mongo/db/repl/oplog.h5
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp38
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/replication_process_test.cpp21
-rw-r--r--src/mongo/db/repl/replication_recovery_test.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.h3
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp10
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h3
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp272
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h21
-rw-r--r--src/mongo/db/repl/sync_tail.cpp49
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp21
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp11
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp22
-rw-r--r--src/mongo/dbtests/SConscript9
-rw-r--r--src/mongo/dbtests/dbtests.cpp19
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp304
26 files changed, 761 insertions, 230 deletions
diff --git a/jstests/replsets/oplog_replay_on_startup.js b/jstests/replsets/oplog_replay_on_startup.js
index bfe48252c18..132e05bdb97 100644
--- a/jstests/replsets/oplog_replay_on_startup.js
+++ b/jstests/replsets/oplog_replay_on_startup.js
@@ -58,7 +58,7 @@
oplogEntries.forEach((num) => {
assert.writeOK(oplog.insert({
ts: ts(num),
- t: term,
+ t: NumberLong(term),
h: 1,
op: 'i',
ns: ns,
@@ -76,7 +76,7 @@
// appliedThrough
begin: {
ts: ts(begin),
- t: term,
+ t: NumberLong(term),
},
// minvalid:
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 5ee1bff461b..61c69067ad2 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -223,6 +223,7 @@ error_code("CloseChangeStream", 222)
error_code("IllegalOpMsgFlag", 223)
error_code("JSONSchemaNotAllowed", 224)
error_code("TransactionTooOld", 225)
+error_code("AtomicityFailure", 226)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp
index 14a2aad1aba..f6515bc0cd1 100644
--- a/src/mongo/db/commands/feature_compatibility_version.cpp
+++ b/src/mongo/db/commands/feature_compatibility_version.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -236,11 +237,13 @@ void FeatureCompatibilityVersion::setIfCleanStartup(OperationContext* opCtx,
uassertStatusOK(storageInterface->insertDocument(
opCtx,
nss,
- repl::TimestampedBSONObj{
- BSON("_id" << FeatureCompatibilityVersion::kParameterName
- << FeatureCompatibilityVersion::kVersionField
- << FeatureCompatibilityVersionCommandParser::kVersion36),
- SnapshotName()})); // No timestamp because this write is not replicated.
+ repl::TimestampedBSONObj{BSON("_id"
+ << FeatureCompatibilityVersion::kParameterName
+ << FeatureCompatibilityVersion::kVersionField
+ << FeatureCompatibilityVersionCommandParser::kVersion36),
+ SnapshotName()},
+ repl::OpTime::kUninitializedTerm)); // No timestamp or term because this write is not
+ // replicated.
}
}
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index aeb493e610c..af561045a00 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -137,8 +137,10 @@ Status _applyOps(OperationContext* opCtx,
auto db = dbHolder().get(opCtx, nss.ns());
if (!db) {
+ // Retry in non-atomic mode, since MMAP cannot implicitly create a new database
+ // within an active WriteUnitOfWork.
throw DBException(
- ErrorCodes::NamespaceNotFound,
+ ErrorCodes::AtomicityFailure,
"cannot create a database in atomic applyOps mode; will retry without "
"atomicity");
}
@@ -151,7 +153,7 @@ Status _applyOps(OperationContext* opCtx,
auto collection = db->getCollection(opCtx, nss);
if (!collection && !nss.isSystemDotIndexes() && (*opType == 'i' || *opType == 'u')) {
throw DBException(
- ErrorCodes::NamespaceNotFound,
+ ErrorCodes::AtomicityFailure,
str::stream()
<< "cannot apply insert or update operation on a non-existent namespace "
<< nss.ns()
@@ -159,7 +161,15 @@ Status _applyOps(OperationContext* opCtx,
<< redact(opObj));
}
+ // Cannot specify timestamp values in an atomic applyOps.
+ if (opObj.hasField("ts")) {
+ throw DBException(ErrorCodes::AtomicityFailure,
+ "cannot apply an op with a timestamp in atomic applyOps mode; "
+ "will retry without atomicity");
+ }
+
OldClientContext ctx(opCtx, nss.ns());
+
status = repl::applyOperation_inlock(opCtx, ctx.db(), opObj, alwaysUpsert);
if (!status.isOK())
return status;
@@ -363,6 +373,7 @@ Status applyOps(OperationContext* opCtx,
// Perform write ops atomically
invariant(globalWriteLock);
+
try {
writeConflictRetry(opCtx, "applyOps", dbName, [&] {
BSONObjBuilder intermediateResult;
@@ -400,9 +411,8 @@ Status applyOps(OperationContext* opCtx,
result->appendElements(intermediateResult.obj());
});
} catch (const DBException& ex) {
- if (ex.code() == ErrorCodes::NamespaceNotFound) {
- // Retry in non-atomic mode, since MMAP cannot implicitly create a new database
- // within an active WriteUnitOfWork.
+ if (ex.code() == ErrorCodes::AtomicityFailure) {
+ // Retry in non-atomic mode.
return _applyOps(opCtx, dbName, applyOpCmd, result, &numApplied);
}
BSONArrayBuilder ab;
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 85a5d7e5795..3705afed383 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -146,8 +146,10 @@ protected:
_storageInterfaceWorkDone.createOplogCalled = true;
return Status::OK();
};
- _storageInterface.insertDocumentFn = [this](
- OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
+ _storageInterface.insertDocumentFn = [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const TimestampedBSONObj& doc,
+ long long term) {
++_storageInterfaceWorkDone.documentsInsertedCount;
return Status::OK();
};
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index b2c21cc62d8..96a03c06414 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -756,7 +756,6 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS
void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- Timestamp oplogSeedDocTimestamp;
OpTimeWithHash optimeWithHash;
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -774,8 +773,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
return;
}
optimeWithHash = optimeWithHashStatus.getValue();
- oplogSeedDocTimestamp = _initialSyncState->stopTimestamp =
- optimeWithHash.opTime.getTimestamp();
+ _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp();
if (_initialSyncState->beginTimestamp != _initialSyncState->stopTimestamp) {
invariant(_lastApplied.opTime.isNull());
@@ -800,7 +798,8 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
auto status = _storage->insertDocument(
opCtx.get(),
_opts.localOplogNS,
- TimestampedBSONObj{oplogSeedDoc, SnapshotName(oplogSeedDocTimestamp)});
+ TimestampedBSONObj{oplogSeedDoc, SnapshotName(optimeWithHash.opTime.getTimestamp())},
+ optimeWithHash.opTime.getTerm());
if (!status.isOK()) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 8d765cefc04..952c3d5e2f4 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -242,8 +242,10 @@ protected:
_storageInterfaceWorkDone.truncateCalled = true;
return Status::OK();
};
- _storageInterface->insertDocumentFn = [this](
- OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
+ _storageInterface->insertDocumentFn = [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const TimestampedBSONObj& doc,
+ long long term) {
LockGuard lock(_storageInterfaceWorkDoneMutex);
++_storageInterfaceWorkDone.documentsInsertedCount;
return Status::OK();
@@ -2053,12 +2055,17 @@ TEST_F(
NamespaceString insertDocumentNss;
TimestampedBSONObj insertDocumentDoc;
- _storageInterface->insertDocumentFn = [&insertDocumentDoc, &insertDocumentNss](
- OperationContext*, const NamespaceString& nss, const TimestampedBSONObj& doc) {
- insertDocumentNss = nss;
- insertDocumentDoc = doc;
- return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
- };
+ long long insertDocumentTerm;
+ _storageInterface->insertDocumentFn =
+ [&insertDocumentDoc, &insertDocumentNss, &insertDocumentTerm](OperationContext*,
+ const NamespaceString& nss,
+ const TimestampedBSONObj& doc,
+ long long term) {
+ insertDocumentNss = nss;
+ insertDocumentDoc = doc;
+ insertDocumentTerm = term;
+ return Status(ErrorCodes::OperationFailed, "failed to insert oplog entry");
+ };
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
@@ -2112,10 +2119,17 @@ TEST_F(
NamespaceString insertDocumentNss;
TimestampedBSONObj insertDocumentDoc;
- _storageInterface->insertDocumentFn = [initialSyncer, &insertDocumentDoc, &insertDocumentNss](
- OperationContext*, const NamespaceString& nss, const TimestampedBSONObj& doc) {
+ long long insertDocumentTerm;
+ _storageInterface->insertDocumentFn = [initialSyncer,
+ &insertDocumentDoc,
+ &insertDocumentNss,
+ &insertDocumentTerm](OperationContext*,
+ const NamespaceString& nss,
+ const TimestampedBSONObj& doc,
+ long long term) {
insertDocumentNss = nss;
insertDocumentDoc = doc;
+ insertDocumentTerm = term;
initialSyncer->shutdown().transitional_ignore();
return Status::OK();
};
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index a88d8dc7589..6fa9c678fbd 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -351,7 +351,6 @@ OplogDocWriter _logOpWriter(OperationContext* opCtx,
}
appendSessionInfo(opCtx, &b, statementId, sessionInfo, oplogLink);
-
return OplogDocWriter(OplogDocWriter(b.obj(), obj));
}
} // end anon namespace
@@ -936,25 +935,22 @@ Status applyOperation_inlock(OperationContext* opCtx,
OpCounters* opCounters = opCtx->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
- std::array<StringData, 7> names = {"ts", "o", "ui", "ns", "op", "b", "o2"};
- std::array<BSONElement, 7> fields;
+ std::array<StringData, 8> names = {"ts", "t", "o", "ui", "ns", "op", "b", "o2"};
+ std::array<BSONElement, 8> fields;
op.getFields(names, &fields);
BSONElement& fieldTs = fields[0];
- BSONElement& fieldO = fields[1];
- BSONElement& fieldUI = fields[2];
- BSONElement& fieldNs = fields[3];
- BSONElement& fieldOp = fields[4];
- BSONElement& fieldB = fields[5];
- BSONElement& fieldO2 = fields[6];
+ BSONElement& fieldT = fields[1];
+ BSONElement& fieldO = fields[2];
+ BSONElement& fieldUI = fields[3];
+ BSONElement& fieldNs = fields[4];
+ BSONElement& fieldOp = fields[5];
+ BSONElement& fieldB = fields[6];
+ BSONElement& fieldO2 = fields[7];
BSONObj o;
if (fieldO.isABSONObj())
o = fieldO.embeddedObject();
- SnapshotName timestamp;
- if (fieldTs.ok()) {
- timestamp = SnapshotName(fieldTs.timestamp());
- }
// operation type -- see logOp() comments for types
const char* opType = fieldOp.valuestrsafe();
@@ -1020,16 +1016,55 @@ Status applyOperation_inlock(OperationContext* opCtx,
if (fieldO.type() == Array) {
// Batched inserts.
+
+ // Cannot apply an array insert with applyOps command. No support for wiping out
+ // the provided timestamps and using new ones for oplog.
+ uassert(ErrorCodes::OperationFailed,
+ "Cannot apply an array insert with applyOps",
+ !opCtx->writesAreReplicated());
+
+ uassert(ErrorCodes::BadValue,
+ "Expected array for field 'ts'",
+ fieldTs.ok() && fieldTs.type() == Array);
+ uassert(ErrorCodes::BadValue,
+ "Expected array for field 't'",
+ fieldT.ok() && fieldT.type() == Array);
+
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to apply insert due to empty array element: "
+ << op.toString(),
+ !fieldO.Obj().isEmpty() && !fieldTs.Obj().isEmpty() && !fieldT.Obj().isEmpty());
+
std::vector<InsertStatement> insertObjs;
- for (auto elem : fieldO.Obj()) {
+ auto fieldOIt = fieldO.Obj().begin();
+ auto fieldTsIt = fieldTs.Obj().begin();
+ auto fieldTIt = fieldT.Obj().begin();
+
+ while (true) {
+ auto oElem = fieldOIt.next();
+ auto tsElem = fieldTsIt.next();
+ auto tElem = fieldTIt.next();
+
// Note: we don't care about statement ids here since the secondaries don't create
// their own oplog entries.
- insertObjs.emplace_back(elem.Obj(), timestamp);
+ insertObjs.emplace_back(
+ oElem.Obj(), SnapshotName(tsElem.timestamp()), tElem.Long());
+ if (!fieldOIt.more()) {
+ // Make sure arrays are the same length.
+ uassert(ErrorCodes::OperationFailed,
+ str::stream()
+ << "Failed to apply insert due to invalid array elements: "
+ << op.toString(),
+ !fieldTsIt.more());
+ break;
+ }
+ // Make sure arrays are the same length.
+ uassert(ErrorCodes::OperationFailed,
+ str::stream() << "Failed to apply insert due to invalid array elements: "
+ << op.toString(),
+ fieldTsIt.more());
}
- uassert(ErrorCodes::OperationFailed,
- str::stream() << "Failed to apply insert due to empty array element: "
- << op.toString(),
- !insertObjs.empty());
+
WriteUnitOfWork wuow(opCtx);
OpDebug* const nullOpDebug = nullptr;
Status status = collection->insertDocuments(
@@ -1066,10 +1101,25 @@ Status applyOperation_inlock(OperationContext* opCtx,
bool needToDoUpsert = haveWrappingWriteUnitOfWork;
if (!needToDoUpsert) {
+ SnapshotName timestamp;
+ long long term = OpTime::kUninitializedTerm;
+
WriteUnitOfWork wuow(opCtx);
+
+ // Do not use supplied timestamps if running through applyOps, as that would allow
+ // a user to dictate what timestamps appear in the oplog.
+ if (!opCtx->writesAreReplicated()) {
+ if (fieldTs.ok()) {
+ timestamp = SnapshotName(fieldTs.timestamp());
+ }
+ if (fieldT.ok()) {
+ term = fieldT.Long();
+ }
+ }
+
OpDebug* const nullOpDebug = nullptr;
auto status = collection->insertDocument(
- opCtx, InsertStatement(o, timestamp), nullOpDebug, true);
+ opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true);
if (status.isOK()) {
wuow.commit();
} else if (status == ErrorCodes::DuplicateKey) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 444b83236a2..a5eff8ac1aa 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -64,9 +64,8 @@ public:
InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {}
InsertStatement(StmtId statementId, BSONObj toInsert, OplogSlot os)
: stmtId(statementId), oplogSlot(os), doc(toInsert) {}
- InsertStatement(BSONObj toInsert, SnapshotName ts)
- : oplogSlot(repl::OpTime(Timestamp(ts.asU64()), repl::OpTime::kUninitializedTerm), 0),
- doc(toInsert) {}
+ InsertStatement(BSONObj toInsert, SnapshotName ts, long long term)
+ : oplogSlot(repl::OpTime(Timestamp(ts.asU64()), term), 0), doc(toInsert) {}
StmtId stmtId = kUninitializedStmtId;
OplogSlot oplogSlot;
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 2e6c24324a0..c40bebd731e 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -273,7 +273,8 @@ TEST_F(OplogBufferCollectionTest, StartupWithExistingCollectionInitializesCorrec
_opCtx.get(),
nss,
TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
_assertDocumentsInCollectionEquals(_opCtx.get(), nss, oplog);
OplogBufferCollection::Options opts;
@@ -365,8 +366,11 @@ DEATH_TEST_F(OplogBufferCollectionTest,
CollectionOptions collOpts;
collOpts.setNoIdIndex();
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, collOpts));
- ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, TimestampedBSONObj{makeOplogEntry(1), SnapshotName(0)}));
+ ASSERT_OK(
+ _storageInterface->insertDocument(_opCtx.get(),
+ nss,
+ TimestampedBSONObj{makeOplogEntry(1), SnapshotName(0)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -379,8 +383,11 @@ DEATH_TEST_F(OplogBufferCollectionTest,
"Fatal assertion 40405 NoSuchKey: Missing expected field \"ts\"") {
auto nss = makeNamespace(_agent);
ASSERT_OK(_storageInterface->createCollection(_opCtx.get(), nss, CollectionOptions()));
- ASSERT_OK(_storageInterface->insertDocument(
- _opCtx.get(), nss, TimestampedBSONObj{BSON("_id" << 1), SnapshotName(0)}));
+ ASSERT_OK(
+ _storageInterface->insertDocument(_opCtx.get(),
+ nss,
+ TimestampedBSONObj{BSON("_id" << 1), SnapshotName(0)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -396,7 +403,8 @@ DEATH_TEST_F(OplogBufferCollectionTest,
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
- TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1))), SnapshotName(1)}));
+ TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1))), SnapshotName(1)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -411,7 +419,8 @@ TEST_F(OplogBufferCollectionTest, PeekWithExistingCollectionReturnsEmptyObjectWh
_opCtx.get(),
nss,
TimestampedBSONObj{BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 0)),
- SnapshotName(1)}));
+ SnapshotName(1)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -432,7 +441,8 @@ TEST_F(OplogBufferCollectionTest,
_opCtx.get(),
nss,
TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
OplogBufferCollection oplogBuffer(_storageInterface, nss, opts);
@@ -535,7 +545,8 @@ TEST_F(OplogBufferCollectionTest, PeekingFromExistingCollectionReturnsDocument)
_opCtx.get(),
nss,
TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(oplog[0], {}, 0)),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -678,13 +689,15 @@ TEST_F(OplogBufferCollectionTest,
_opCtx.get(),
nss,
TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(firstDoc, {}, 0)),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
auto secondDoc = makeOplogEntry(2);
ASSERT_OK(_storageInterface->insertDocument(
_opCtx.get(),
nss,
TimestampedBSONObj{std::get<0>(OplogBufferCollection::addIdToDocument(secondDoc, {}, 0)),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
@@ -803,7 +816,8 @@ TEST_F(OplogBufferCollectionTest, WaitForDataReturnsImmediatelyWhenStartedWithEx
nss,
TimestampedBSONObj{
std::get<0>(OplogBufferCollection::addIdToDocument(makeOplogEntry(1), {}, 0)),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
OplogBufferCollection::Options opts;
opts.dropCollectionAtStartup = false;
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
index 34e2f2cae72..bb3199e2a62 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -311,7 +311,8 @@ TEST_F(ReplicationConsistencyMarkersTest, OplogTruncateAfterPointUpgrade) {
<< minValidTime.getTerm()
<< MinValidDocument::kOldOplogDeleteFromPointFieldName
<< time1),
- SnapshotName(0)}));
+ SnapshotName(0)},
+ OpTime::kUninitializedTerm));
consistencyMarkers.initializeMinValidDocument(opCtx);
// Set the feature compatibility version to 3.6.
diff --git a/src/mongo/db/repl/replication_process_test.cpp b/src/mongo/db/repl/replication_process_test.cpp
index 3950e27418b..99b3d22e287 100644
--- a/src/mongo/db/repl/replication_process_test.cpp
+++ b/src/mongo/db/repl/replication_process_test.cpp
@@ -113,7 +113,8 @@ TEST_F(ReplicationProcessTest,
ReplicationProcess::kRollbackProgressNamespace,
TimestampedBSONObj{BSON("_id"
<< "not progress"),
- SnapshotName(0)}));
+ SnapshotName()},
+ OpTime::kUninitializedTerm));
ASSERT_EQUALS(ErrorCodes::NoSuchKey, replicationProcess.getRollbackProgress(opCtx.get()));
}
@@ -126,8 +127,10 @@ TEST_F(ReplicationProcessTest, GetRollbackProgressReturnsBadStatusIfApplyUntilFi
auto opCtx = makeOpCtx();
ASSERT_OK(_storageInterface->createCollection(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, {}));
- ASSERT_OK(_storageInterface->insertDocument(
- opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, doc));
+ ASSERT_OK(_storageInterface->insertDocument(opCtx.get(),
+ ReplicationProcess::kRollbackProgressNamespace,
+ doc,
+ OpTime::kUninitializedTerm));
ReplicationProcess replicationProcess(
_storageInterface.get(),
@@ -149,8 +152,10 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ASSERT_OK(_storageInterface->createCollection(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, {}));
- ASSERT_OK(_storageInterface->insertDocument(
- opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, doc));
+ ASSERT_OK(_storageInterface->insertDocument(opCtx.get(),
+ ReplicationProcess::kRollbackProgressNamespace,
+ doc,
+ OpTime::kUninitializedTerm));
ReplicationProcess replicationProcess(
_storageInterface.get(),
@@ -170,8 +175,10 @@ TEST_F(ReplicationProcessTest,
auto opCtx = makeOpCtx();
ASSERT_OK(_storageInterface->createCollection(
opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, {}));
- ASSERT_OK(_storageInterface->insertDocument(
- opCtx.get(), ReplicationProcess::kRollbackProgressNamespace, doc));
+ ASSERT_OK(_storageInterface->insertDocument(opCtx.get(),
+ ReplicationProcess::kRollbackProgressNamespace,
+ doc,
+ OpTime::kUninitializedTerm));
ReplicationProcess replicationProcess(
_storageInterface.get(),
diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp
index 7d91431d949..65468b1be59 100644
--- a/src/mongo/db/repl/replication_recovery_test.cpp
+++ b/src/mongo/db/repl/replication_recovery_test.cpp
@@ -163,7 +163,8 @@ void _setUpOplog(OperationContext* opCtx, StorageInterface* storage, std::vector
ASSERT_OK(storage->createCollection(opCtx, oplogNs, _createOplogCollectionOptions()));
for (int ts : timestamps) {
- ASSERT_OK(storage->insertDocument(opCtx, oplogNs, _makeOplogEntry(ts)));
+ ASSERT_OK(storage->insertDocument(
+ opCtx, oplogNs, _makeOplogEntry(ts), OpTime::kUninitializedTerm));
}
}
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 309b1715aa0..837d73fa4f1 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -111,7 +111,8 @@ public:
*/
virtual Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const TimestampedBSONObj& doc) = 0;
+ const TimestampedBSONObj& doc,
+ long long term) = 0;
/**
* Inserts the given documents, with associated timestamps and statement id's, into the
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index e9f6b002cf6..47f2fd8d92f 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -127,7 +127,10 @@ Status StorageInterfaceImpl::initializeRollbackID(OperationContext* opCtx) {
BSONObjBuilder bob;
rbid.serialize(&bob);
SnapshotName noTimestamp; // This write is not replicated.
- return insertDocument(opCtx, _rollbackIdNss, TimestampedBSONObj{bob.done(), noTimestamp});
+ return insertDocument(opCtx,
+ _rollbackIdNss,
+ TimestampedBSONObj{bob.done(), noTimestamp},
+ OpTime::kUninitializedTerm);
}
Status StorageInterfaceImpl::incrementRollbackID(OperationContext* opCtx) {
@@ -264,8 +267,9 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const TimestampedBSONObj& doc) {
- return insertDocuments(opCtx, nss, {InsertStatement(doc.obj, doc.timestamp)});
+ const TimestampedBSONObj& doc,
+ long long term) {
+ return insertDocuments(opCtx, nss, {InsertStatement(doc.obj, doc.timestamp, term)});
}
namespace {
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index dfb367f398d..c020648106d 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -66,7 +66,8 @@ public:
Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const TimestampedBSONObj& doc) override;
+ const TimestampedBSONObj& doc,
+ long long term) override;
Status insertDocuments(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index f86d56f1a25..607ba207b57 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -145,7 +145,7 @@ std::vector<InsertStatement> transformInserts(std::vector<BSONObj> docs) {
std::vector<InsertStatement> transformInserts(std::vector<TimestampedBSONObj> docs) {
std::vector<InsertStatement> inserts(docs.size());
std::transform(docs.cbegin(), docs.cend(), inserts.begin(), [](const TimestampedBSONObj& doc) {
- return InsertStatement(doc.obj, doc.timestamp);
+ return InsertStatement(doc.obj, doc.timestamp, OpTime::kUninitializedTerm);
});
return inserts;
}
@@ -308,7 +308,7 @@ TEST_F(StorageInterfaceImplTest, IncrementRollbackIDRollsToZeroWhenExceedingMaxI
<< StorageInterfaceImpl::kRollbackIdFieldName
<< std::numeric_limits<int>::max()),
SnapshotName(0)};
- ASSERT_OK(storage.insertDocument(opCtx, nss, maxDoc));
+ ASSERT_OK(storage.insertDocument(opCtx, nss, maxDoc, OpTime::kUninitializedTerm));
_assertRollbackIDDocument(opCtx, std::numeric_limits<int>::max());
auto rbid = unittest::assertGet(storage.getRollbackID(opCtx));
@@ -459,7 +459,8 @@ TEST_F(StorageInterfaceImplTest, InsertMissingDocWorksOnExistingCappedCollection
opts.capped = true;
opts.cappedSize = 1024 * 1024;
createCollection(opCtx, nss, opts);
- ASSERT_OK(storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}));
+ ASSERT_OK(storage.insertDocument(
+ opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}, OpTime::kUninitializedTerm));
AutoGetCollectionForReadCommand autoColl(opCtx, nss);
ASSERT_TRUE(autoColl.getCollection());
}
@@ -469,7 +470,8 @@ TEST_F(StorageInterfaceImplTest, InsertMissingDocWorksOnExistingCollection) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
createCollection(opCtx, nss);
- ASSERT_OK(storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}));
+ ASSERT_OK(storage.insertDocument(
+ opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}, OpTime::kUninitializedTerm));
AutoGetCollectionForReadCommand autoColl(opCtx, nss);
ASSERT_TRUE(autoColl.getCollection());
}
@@ -478,7 +480,8 @@ TEST_F(StorageInterfaceImplTest, InsertMissingDocFailesIfCollectionIsMissing) {
auto opCtx = getOperationContext();
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
- const auto status = storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)});
+ const auto status = storage.insertDocument(
+ opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}, OpTime::kUninitializedTerm);
ASSERT_NOT_OK(status);
ASSERT_EQ(status.code(), ErrorCodes::NamespaceNotFound);
}
@@ -648,7 +651,8 @@ TEST_F(StorageInterfaceImplTest, DropCollectionWorksWithExistingWithDataCollecti
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
createCollection(opCtx, nss);
- ASSERT_OK(storage.insertDocument(opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}));
+ ASSERT_OK(storage.insertDocument(
+ opCtx, nss, {BSON("_id" << 1), SnapshotName(1)}, OpTime::kUninitializedTerm));
ASSERT_OK(storage.dropCollection(opCtx, nss));
}
@@ -1055,13 +1059,14 @@ TEST_F(StorageInterfaceImplTest,
auto nss = makeNamespace(_agent);
auto indexName = "_id_"_sd;
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 0), SnapshotName(0)},
- {BSON("_id" << 1), SnapshotName(1)},
- {BSON("_id" << 2), SnapshotName(2)},
- {BSON("_id" << 3), SnapshotName(3)},
- {BSON("_id" << 4), SnapshotName(4)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 1), SnapshotName(1), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(2), OpTime::kUninitializedTerm},
+ {BSON("_id" << 3), SnapshotName(3), OpTime::kUninitializedTerm},
+ {BSON("_id" << 4), SnapshotName(4), OpTime::kUninitializedTerm}}));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1170,11 +1175,12 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(1)},
- {BSON("_id" << 2), SnapshotName(2)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(1), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(2), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 1),
_assetGetFront(storage.findDocuments(opCtx,
@@ -1200,11 +1206,12 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(1)},
- {BSON("_id" << 2), SnapshotName(2)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(1), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(2), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 0),
_assetGetFront(storage.findDocuments(opCtx,
@@ -1224,11 +1231,12 @@ TEST_F(StorageInterfaceImplTest, FindDocumentsCollScanReturnsNoSuchKeyIfStartKey
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(1)},
- {BSON("_id" << 2), SnapshotName(2)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(1), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(2), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey,
storage
.findDocuments(opCtx,
@@ -1247,11 +1255,12 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
storage
.findDocuments(opCtx,
@@ -1323,16 +1332,17 @@ TEST_F(StorageInterfaceImplTest,
auto nss = makeNamespace(_agent);
auto indexName = "_id_"_sd;
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 0), SnapshotName(0)},
- {BSON("_id" << 1), SnapshotName(1)},
- {BSON("_id" << 2), SnapshotName(2)},
- {BSON("_id" << 3), SnapshotName(3)},
- {BSON("_id" << 4), SnapshotName(4)},
- {BSON("_id" << 5), SnapshotName(5)},
- {BSON("_id" << 6), SnapshotName(6)},
- {BSON("_id" << 7), SnapshotName(7)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 1), SnapshotName(1), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(2), OpTime::kUninitializedTerm},
+ {BSON("_id" << 3), SnapshotName(3), OpTime::kUninitializedTerm},
+ {BSON("_id" << 4), SnapshotName(4), OpTime::kUninitializedTerm},
+ {BSON("_id" << 5), SnapshotName(5), OpTime::kUninitializedTerm},
+ {BSON("_id" << 6), SnapshotName(6), OpTime::kUninitializedTerm},
+ {BSON("_id" << 7), SnapshotName(7), OpTime::kUninitializedTerm}}));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1433,16 +1443,17 @@ TEST_F(StorageInterfaceImplTest,
auto nss = makeNamespace(_agent);
auto indexName = "_id_"_sd;
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 0), SnapshotName(0)},
- {BSON("_id" << 1), SnapshotName(1)},
- {BSON("_id" << 2), SnapshotName(2)},
- {BSON("_id" << 3), SnapshotName(3)},
- {BSON("_id" << 4), SnapshotName(4)},
- {BSON("_id" << 5), SnapshotName(5)},
- {BSON("_id" << 6), SnapshotName(6)},
- {BSON("_id" << 7), SnapshotName(7)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 1), SnapshotName(1), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(2), OpTime::kUninitializedTerm},
+ {BSON("_id" << 3), SnapshotName(3), OpTime::kUninitializedTerm},
+ {BSON("_id" << 4), SnapshotName(4), OpTime::kUninitializedTerm},
+ {BSON("_id" << 5), SnapshotName(5), OpTime::kUninitializedTerm},
+ {BSON("_id" << 6), SnapshotName(6), OpTime::kUninitializedTerm},
+ {BSON("_id" << 7), SnapshotName(7), OpTime::kUninitializedTerm}}));
// startKey not provided
ASSERT_BSONOBJ_EQ(
@@ -1542,11 +1553,12 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 1),
_assetGetFront(storage.deleteDocuments(opCtx,
@@ -1566,11 +1578,12 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_BSONOBJ_EQ(
BSON("_id" << 0),
_assetGetFront(storage.deleteDocuments(opCtx,
@@ -1589,11 +1602,12 @@ TEST_F(StorageInterfaceImplTest, DeleteDocumentsCollScanReturnsNoSuchKeyIfStartK
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey,
storage
.deleteDocuments(opCtx,
@@ -1612,11 +1626,12 @@ TEST_F(StorageInterfaceImplTest,
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::InvalidOptions,
storage
.deleteDocuments(opCtx,
@@ -1660,8 +1675,10 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(
- storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}}));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc2, SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::TooManyMatchingDocuments,
storage.findSingleton(opCtx, nss).getStatus());
}
@@ -1672,7 +1689,8 @@ TEST_F(StorageInterfaceImplTest, FindSingletonReturnsDocumentWhenSingletonDocume
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}));
+ ASSERT_OK(
+ storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}, OpTime::kUninitializedTerm));
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(storage.findSingleton(opCtx, nss)));
}
@@ -1712,7 +1730,8 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesDocumentWhenCollectionIsNotE
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}));
+ ASSERT_OK(
+ storage.insertDocument(opCtx, nss, {doc1, SnapshotName(0)}, OpTime::kUninitializedTerm));
auto update = BSON("$set" << BSON("x" << 1));
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1),
@@ -1727,8 +1746,10 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesFirstDocumentWhenCollectionI
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(
- storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}}));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc2, SnapshotName(0), OpTime::kUninitializedTerm}}));
auto update = BSON("$set" << BSON("x" << 2));
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
_assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 2), doc2});
@@ -1760,8 +1781,10 @@ TEST_F(StorageInterfaceImplTest, FindByIdReturnsNoSuchKeyWhenDocumentIsNotFound)
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(
- storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc3, SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey, storage.findById(opCtx, nss, doc2["_id"]).getStatus());
}
@@ -1773,8 +1796,11 @@ TEST_F(StorageInterfaceImplTest, FindByIdReturnsDocumentWhenDocumentExists) {
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc2, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc3, SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_BSONOBJ_EQ(doc2, unittest::assertGet(storage.findById(opCtx, nss, doc2["_id"])));
}
@@ -1805,8 +1831,10 @@ TEST_F(StorageInterfaceImplTest, DeleteByIdReturnsNoSuchKeyWhenDocumentIsNotFoun
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(
- storage.insertDocuments(opCtx, nss, {{doc1, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc3, SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_EQUALS(ErrorCodes::NoSuchKey, storage.deleteById(opCtx, nss, doc2["_id"]).getStatus());
_assertDocumentsInCollectionEquals(opCtx, nss, {doc1, doc3});
}
@@ -1819,8 +1847,11 @@ TEST_F(StorageInterfaceImplTest, DeleteByIdReturnsDocumentWhenDocumentExists) {
auto doc1 = BSON("_id" << 0 << "x" << 0);
auto doc2 = BSON("_id" << 1 << "x" << 1);
auto doc3 = BSON("_id" << 2 << "x" << 2);
- ASSERT_OK(storage.insertDocuments(
- opCtx, nss, {{doc1, SnapshotName(0)}, {doc2, SnapshotName(0)}, {doc3, SnapshotName(0)}}));
+ ASSERT_OK(storage.insertDocuments(opCtx,
+ nss,
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc2, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc3, SnapshotName(0), OpTime::kUninitializedTerm}}));
ASSERT_BSONOBJ_EQ(doc2, unittest::assertGet(storage.deleteById(opCtx, nss, doc2["_id"])));
_assertDocumentsInCollectionEquals(opCtx, nss, {doc1, doc3});
}
@@ -1857,11 +1888,12 @@ TEST_F(StorageInterfaceImplTest, UpsertSingleDocumentReplacesExistingDocumentInC
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto originalDoc = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
- {originalDoc, SnapshotName(0)},
- {BSON("_id" << 2 << "x" << 2), SnapshotName(2)}}));
+ ASSERT_OK(storage.insertDocuments(
+ opCtx,
+ nss,
+ {{BSON("_id" << 0 << "x" << 0), SnapshotName(0), OpTime::kUninitializedTerm},
+ {originalDoc, SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(2), OpTime::kUninitializedTerm}}));
ASSERT_OK(storage.upsertById(opCtx, nss, originalDoc["_id"], BSON("x" << 100)));
@@ -1878,10 +1910,11 @@ TEST_F(StorageInterfaceImplTest, UpsertSingleDocumentInsertsNewDocumentInCollect
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
- {BSON("_id" << 2 << "x" << 2), SnapshotName(2)}}));
+ ASSERT_OK(storage.insertDocuments(
+ opCtx,
+ nss,
+ {{BSON("_id" << 0 << "x" << 0), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(2), OpTime::kUninitializedTerm}}));
ASSERT_OK(storage.upsertById(opCtx, nss, BSON("" << 1).firstElement(), BSON("x" << 100)));
@@ -1906,11 +1939,12 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto originalDoc = BSON("_id" << 1 << "x" << 1);
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 0 << "x" << 0), SnapshotName(0)},
- {originalDoc, SnapshotName(0)},
- {BSON("_id" << 2 << "x" << 2), SnapshotName(2)}}));
+ ASSERT_OK(storage.insertDocuments(
+ opCtx,
+ nss,
+ {{BSON("_id" << 0 << "x" << 0), SnapshotName(0), OpTime::kUninitializedTerm},
+ {originalDoc, SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2 << "x" << 2), SnapshotName(2), OpTime::kUninitializedTerm}}));
ASSERT_OK(storage.upsertById(opCtx, nss, originalDoc["_id"], BSON("x" << 100)));
@@ -2020,7 +2054,8 @@ TEST_F(
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, {{doc, SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx, nss, {{doc, SnapshotName(0), OpTime::kUninitializedTerm}}));
_assertDocumentsInCollectionEquals(opCtx, nss, {doc});
// This test fixture disables replicated writes by default. We want to re-enable this setting
@@ -2045,7 +2080,8 @@ TEST_F(
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc = BSON("_id" << 0 << "x" << 0);
- ASSERT_OK(storage.insertDocuments(opCtx, nss, {{doc, SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx, nss, {{doc, SnapshotName(0), OpTime::kUninitializedTerm}}));
_assertDocumentsInCollectionEquals(opCtx, nss, {doc});
// This test fixture disables replicated writes by default. We want to re-enable this setting
@@ -2188,10 +2224,10 @@ TEST_F(StorageInterfaceImplTest,
<< "def");
ASSERT_OK(storage.insertDocuments(opCtx,
nss,
- {{doc1, SnapshotName(0)},
- {doc2, SnapshotName(0)},
- {doc3, SnapshotName(0)},
- {doc4, SnapshotName(0)}}));
+ {{doc1, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc2, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc3, SnapshotName(0), OpTime::kUninitializedTerm},
+ {doc4, SnapshotName(0), OpTime::kUninitializedTerm}}));
// This filter should remove doc1 and doc2 because the values of the field "x"
// are equivalent to "aBc" under the case-insensive collation.
@@ -2236,11 +2272,12 @@ TEST_F(StorageInterfaceImplTest, GetCollectionCountReturnsCollectionCount) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
auto count = unittest::assertGet(storage.getCollectionCount(opCtx, nss));
ASSERT_EQUALS(3UL, count);
}
@@ -2278,11 +2315,12 @@ TEST_F(StorageInterfaceImplTest, GetCollectionSizeReturnsCollectionSize) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- ASSERT_OK(storage.insertDocuments(opCtx,
- nss,
- {{BSON("_id" << 1), SnapshotName(0)},
- {BSON("_id" << 2), SnapshotName(0)},
- {BSON("_id" << 0), SnapshotName(0)}}));
+ ASSERT_OK(
+ storage.insertDocuments(opCtx,
+ nss,
+ {{BSON("_id" << 1), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 2), SnapshotName(0), OpTime::kUninitializedTerm},
+ {BSON("_id" << 0), SnapshotName(0), OpTime::kUninitializedTerm}}));
auto size = unittest::assertGet(storage.getCollectionSize(opCtx, nss));
ASSERT_NOT_EQUALS(0UL, size);
}
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 18902950699..f0c14a61b9e 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -89,8 +89,10 @@ public:
const CollectionOptions& options,
const BSONObj idIndexSpec,
const std::vector<BSONObj>& secondaryIndexSpecs)>;
- using InsertDocumentFn = stdx::function<Status(
- OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc)>;
+ using InsertDocumentFn = stdx::function<Status(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const TimestampedBSONObj& doc,
+ long long term)>;
using InsertDocumentsFn = stdx::function<Status(OperationContext* opCtx,
const NamespaceString& nss,
const std::vector<InsertStatement>& docs)>;
@@ -137,8 +139,9 @@ public:
Status insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
- const TimestampedBSONObj& doc) override {
- return insertDocumentFn(opCtx, nss, doc);
+ const TimestampedBSONObj& doc,
+ long long term) override {
+ return insertDocumentFn(opCtx, nss, doc, term);
};
Status insertDocuments(OperationContext* opCtx,
@@ -279,10 +282,12 @@ public:
secondaryIndexSpecs) -> StatusWith<std::unique_ptr<CollectionBulkLoader>> {
return Status{ErrorCodes::IllegalOperation, "CreateCollectionForBulkFn not implemented."};
};
- InsertDocumentFn insertDocumentFn =
- [](OperationContext* opCtx, const NamespaceString& nss, const TimestampedBSONObj& doc) {
- return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."};
- };
+ InsertDocumentFn insertDocumentFn = [](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const TimestampedBSONObj& doc,
+ long long term) {
+ return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."};
+ };
InsertDocumentsFn insertDocumentsFn = [](OperationContext* opCtx,
const NamespaceString& nss,
const std::vector<InsertStatement>& docs) {
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 22c5640e94e..3d14db9c24a 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -490,8 +490,9 @@ void scheduleWritesToOplog(OperationContext* opCtx,
for (size_t i = begin; i < end; i++) {
// Add as unowned BSON to avoid unnecessary ref-count bumps.
// 'ops' will outlive 'docs' so the BSON lifetime will be guaranteed.
- docs.emplace_back(
- InsertStatement{ops[i].raw, SnapshotName(ops[i].getOpTime().getTimestamp())});
+ docs.emplace_back(InsertStatement{ops[i].raw,
+ SnapshotName(ops[i].getOpTime().getTimestamp()),
+ ops[i].getOpTime().getTerm()});
}
fassertStatusOK(40141,
@@ -1214,23 +1215,55 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
if (isGroup) {
// Since we found more than one document, create grouped insert of many docs.
+ // We are going to group many 'i' ops into one big 'i' op, with array fields for
+ // 'ts', 't', and 'o', corresponding to each individual op.
+ // For example:
+ // { ts: Timestamp(1,1), t:1, ns: "test.foo", op:"i", o: {_id:1} }
+ // { ts: Timestamp(1,2), t:1, ns: "test.foo", op:"i", o: {_id:2} }
+ // become:
+ // { ts: [Timestamp(1, 1), Timestamp(1, 2)],
+ // t: [1, 1],
+ // o: [{_id: 1}, {_id: 2}],
+ // ns: "test.foo",
+ // op: "i" }
BSONObjBuilder groupedInsertBuilder;
- // Generate an op object of all elements except for "o", since we need to
- // make the "o" field an array of all the o's.
+
+ // Populate the "ts" field with an array of all the grouped inserts' timestamps.
+ BSONArrayBuilder tsArrayBuilder(groupedInsertBuilder.subarrayStart("ts"));
+ for (auto groupingIterator = oplogEntriesIterator;
+ groupingIterator != endOfGroupableOpsIterator;
+ ++groupingIterator) {
+ tsArrayBuilder.append((*groupingIterator)->getTimestamp());
+ }
+ tsArrayBuilder.done();
+
+ // Populate the "t" (term) field with an array of all the grouped inserts' terms.
+ BSONArrayBuilder tArrayBuilder(groupedInsertBuilder.subarrayStart("t"));
+ for (auto groupingIterator = oplogEntriesIterator;
+ groupingIterator != endOfGroupableOpsIterator;
+ ++groupingIterator) {
+ tArrayBuilder.append(
+ static_cast<long long>((*groupingIterator)->getTerm().get()));
+ }
+ tArrayBuilder.done();
+
+ // Generate an op object of all elements except for "ts", "t", and "o", since we
+ // need to make those fields arrays of all the ts's, t's, and o's.
for (auto elem : entry->raw) {
- if (elem.fieldNameStringData() != "o") {
+ if (elem.fieldNameStringData() != "o" && elem.fieldNameStringData() != "ts" &&
+ elem.fieldNameStringData() != "t") {
groupedInsertBuilder.append(elem);
}
}
// Populate the "o" field with an array of all the grouped inserts.
- BSONArrayBuilder insertArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
+ BSONArrayBuilder oArrayBuilder(groupedInsertBuilder.subarrayStart("o"));
for (auto groupingIterator = oplogEntriesIterator;
groupingIterator != endOfGroupableOpsIterator;
++groupingIterator) {
- insertArrayBuilder.append((*groupingIterator)->getObject());
+ oArrayBuilder.append((*groupingIterator)->getObject());
}
- insertArrayBuilder.done();
+ oArrayBuilder.done();
try {
// Apply the group of inserts.
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 5022aa78722..22c9c66ecc4 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -33,6 +33,7 @@
#include <utility>
#include <vector>
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/collection_options.h"
@@ -142,6 +143,20 @@ void createCollection(OperationContext* opCtx,
});
}
+auto parseFromOplogEntryArray(const BSONObj& obj, int elem) {
+ BSONElement tsArray;
+ Status status =
+ bsonExtractTypedField(obj, OpTime::kTimestampFieldName, BSONType::Array, &tsArray);
+ ASSERT_OK(status);
+
+ BSONElement termArray;
+ status = bsonExtractTypedField(obj, OpTime::kTermFieldName, BSONType::Array, &termArray);
+ ASSERT_OK(status);
+
+ return OpTime(tsArray.Array()[elem].timestamp(), termArray.Array()[elem].Long());
+};
+
+
TEST_F(SyncTailTest, SyncApplyNoNamespaceBadOp) {
const BSONObj op = BSON("op"
<< "x");
@@ -683,7 +698,7 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin
ASSERT_BSONOBJ_EQ(createOp2.raw, operationsApplied[1]);
// Check grouped insert operations in namespace "nss1".
- ASSERT_EQUALS(insertOp1a.getOpTime(), OpTime::parseFromOplogEntry(operationsApplied[2]));
+ ASSERT_EQUALS(insertOp1a.getOpTime(), parseFromOplogEntryArray(operationsApplied[2], 0));
ASSERT_EQUALS(insertOp1a.getNamespace().ns(), operationsApplied[2]["ns"].valuestrsafe());
ASSERT_EQUALS(BSONType::Array, operationsApplied[2]["o"].type());
auto group1 = operationsApplied[2]["o"].Array();
@@ -692,7 +707,7 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin
ASSERT_BSONOBJ_EQ(insertOp1b.getObject(), group1[1].Obj());
// Check grouped insert operations in namespace "nss2".
- ASSERT_EQUALS(insertOp2a.getOpTime(), OpTime::parseFromOplogEntry(operationsApplied[3]));
+ ASSERT_EQUALS(insertOp2a.getOpTime(), parseFromOplogEntryArray(operationsApplied[3], 0));
ASSERT_EQUALS(insertOp2a.getNamespace().ns(), operationsApplied[3]["ns"].valuestrsafe());
ASSERT_EQUALS(BSONType::Array, operationsApplied[3]["o"].type());
auto group2 = operationsApplied[3]["o"].Array();
@@ -738,7 +753,7 @@ TEST_F(SyncTailTest, MultiSyncApplyLimitsBatchCountWhenGroupingInsertOperation)
ASSERT_BSONOBJ_EQ(createOp.raw, operationsApplied[0]);
const auto& groupedInsertOp = operationsApplied[1];
- ASSERT_EQUALS(insertOps.front().getOpTime(), OpTime::parseFromOplogEntry(groupedInsertOp));
+ ASSERT_EQUALS(insertOps.front().getOpTime(), parseFromOplogEntryArray(groupedInsertOp, 0));
ASSERT_EQUALS(insertOps.front().getNamespace().ns(), groupedInsertOp["ns"].valuestrsafe());
ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type());
auto groupedInsertDocuments = groupedInsertOp["o"].Array();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 0e53440f002..dc0fa530444 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -1032,15 +1032,22 @@ void WiredTigerKVEngine::_setOldestTimestamp(SnapshotName oldestTimestamp) {
return;
}
}
+ auto timestampToSet = _previousSetOldestTimestamp;
+ _previousSetOldestTimestamp = oldestTimestamp;
+ if (timestampToSet == SnapshotName()) {
+ // Nothing to set yet.
+ return;
+ }
+
char oldestTSConfigString["oldest_timestamp="_sd.size() + (8 * 2) /* 16 hexadecimal digits */ +
1 /* trailing null */];
auto size = std::snprintf(oldestTSConfigString,
sizeof(oldestTSConfigString),
"oldest_timestamp=%llx",
- static_cast<unsigned long long>(oldestTimestamp.asU64()));
+ static_cast<unsigned long long>(timestampToSet.asU64()));
invariant(static_cast<std::size_t>(size) < sizeof(oldestTSConfigString));
invariantWTOK(_conn->set_timestamp(_conn, oldestTSConfigString));
- LOG(2) << "oldest_timestamp set to " << oldestTimestamp.asU64();
+ LOG(2) << "oldest_timestamp set to " << timestampToSet.asU64();
}
void WiredTigerKVEngine::setInitialDataTimestamp(SnapshotName initialDataTimestamp) {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index d4333b2da4a..e08fa53f2c6 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -235,7 +235,9 @@ private:
std::string _uri(StringData ident) const;
+ // Not threadsafe; callers must be serialized.
void _setOldestTimestamp(SnapshotName oldestTimestamp);
+ SnapshotName _previousSetOldestTimestamp;
WT_CONNECTION* _conn;
WT_EVENT_HANDLER _eventHandler;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index df8cc33266d..8e700112d0d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1157,17 +1157,17 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
for (size_t i = 0; i < nRecords; i++) {
auto& record = records[i];
- if (_isOplog) {
- Timestamp ts;
- if (timestamps[i].isNull()) {
- // If the timestamp is 0, that probably means someone inserted a document directly
- // into the oplog. In this case, use the RecordId as the timestamp, since they are
- // one and the same.
- ts = Timestamp(record.id.repr());
- } else {
- ts = timestamps[i];
- }
- LOG(4) << "inserting record into oplog with timestamp " << ts.asULL();
+ Timestamp ts;
+ if (timestamps[i].isNull() && _isOplog) {
+ // If the timestamp is 0, that probably means someone inserted a document directly
+ // into the oplog. In this case, use the RecordId as the timestamp, since they are
+ // one and the same.
+ ts = Timestamp(record.id.repr());
+ } else {
+ ts = timestamps[i];
+ }
+ LOG(4) << "inserting record with timestamp " << ts.asULL();
+ if (!ts.isNull()) {
fassertStatusOK(39001, opCtx->recoveryUnit()->setTimestamp(SnapshotName(ts)));
}
setKey(c, record.id);
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 1db90bb2605..ca98dec10cb 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -110,6 +110,7 @@ dbtest = env.Program(
'repltests.cpp',
'rollbacktests.cpp',
'socktests.cpp',
+ 'storage_timestamp_tests.cpp',
'threadedtests.cpp',
'updatetests.cpp',
'validate_tests.cpp',
@@ -119,7 +120,10 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/db/auth/authmocks",
"$BUILD_DIR/mongo/db/bson/dotted_path_support",
"$BUILD_DIR/mongo/db/concurrency/deferred_writer",
+ "$BUILD_DIR/mongo/db/logical_clock",
+ "$BUILD_DIR/mongo/db/logical_time_metadata_hook",
"$BUILD_DIR/mongo/db/op_observer_d",
+ "$BUILD_DIR/mongo/db/pipeline/document_value_test_util",
"$BUILD_DIR/mongo/db/query/collation/collator_interface_mock",
"$BUILD_DIR/mongo/db/query/query",
"$BUILD_DIR/mongo/db/query/query_test_service_context",
@@ -129,14 +133,11 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/db/repl/replmocks",
"$BUILD_DIR/mongo/db/serveronly",
"$BUILD_DIR/mongo/db/sessions_collection_standalone",
- "$BUILD_DIR/mongo/db/logical_clock",
- "$BUILD_DIR/mongo/db/logical_time_metadata_hook",
"$BUILD_DIR/mongo/db/storage/mmap_v1/paths",
+ "$BUILD_DIR/mongo/util/clock_source_mock",
"$BUILD_DIR/mongo/util/net/network",
"$BUILD_DIR/mongo/util/progress_meter",
"$BUILD_DIR/mongo/util/version_impl",
- '$BUILD_DIR/mongo/db/pipeline/document_value_test_util',
- '$BUILD_DIR/mongo/util/clock_source_mock',
"mocklib",
"testframework",
],
diff --git a/src/mongo/dbtests/dbtests.cpp b/src/mongo/dbtests/dbtests.cpp
index 7ae04b93520..c4f26023093 100644
--- a/src/mongo/dbtests/dbtests.cpp
+++ b/src/mongo/dbtests/dbtests.cpp
@@ -51,6 +51,7 @@
#include "mongo/dbtests/framework.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/signal_handlers_synchronous.h"
#include "mongo/util/startup_test.h"
@@ -120,6 +121,9 @@ int dbtestsMain(int argc, char** argv, char** envp) {
::mongo::setupSynchronousSignalHandlers();
mongo::dbtests::initWireSpec();
mongo::runGlobalInitializersOrDie(argc, argv, envp);
+ serverGlobalParams.featureCompatibility.version.store(
+ ServerGlobalParams::FeatureCompatibility::Version::k36);
+ serverGlobalParams.featureCompatibility.isSchemaVersion36.store(true);
repl::ReplSettings replSettings;
replSettings.setOplogSizeBytes(10 * 1024 * 1024);
ServiceContext* service = getGlobalServiceContext();
@@ -127,11 +131,26 @@ int dbtestsMain(int argc, char** argv, char** envp) {
auto logicalClock = stdx::make_unique<LogicalClock>(service);
LogicalClock::set(service, std::move(logicalClock));
+ auto fastClock = stdx::make_unique<ClockSourceMock>();
+ // Timestamps are split into two 32-bit integers, seconds and "increments". Currently (but
+ // maybe not for eternity), a Timestamp with a value of `0` seconds is always considered
+ // "null" by `Timestamp::isNull`, regardless of its increment value. Ticking the
+ // `ClockSourceMock` only bumps the "increment" counter, thus by default, generating "null"
+ // timestamps. Bumping by one second here avoids any accidental interpretations.
+ fastClock->advance(Seconds(1));
+ service->setFastClockSource(std::move(fastClock));
+
+ auto preciseClock = stdx::make_unique<ClockSourceMock>();
+ // See above.
+ preciseClock->advance(Seconds(1));
+ service->setPreciseClockSource(std::move(preciseClock));
+
repl::setGlobalReplicationCoordinator(
new repl::ReplicationCoordinatorMock(service, replSettings));
repl::getGlobalReplicationCoordinator()
->setFollowerMode(repl::MemberState::RS_PRIMARY)
.ignore();
+
getGlobalAuthorizationManager()->setAuthEnabled(false);
ScriptEngine::setup();
StartupTest::runTests();
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
new file mode 100644
index 00000000000..e404ca16f15
--- /dev/null
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -0,0 +1,304 @@
+/**
+ * Copyright (C) 2017 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <cstdint>
+
+#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/catalog/index_create.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/kv/kv_storage_engine.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+class StorageTimestampTest {
+public:
+ ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext();
+ OperationContext* _opCtx = _opCtxRaii.get();
+ LogicalClock* _clock = LogicalClock::get(_opCtx);
+
+ StorageTimestampTest() {
+ if (mongo::storageGlobalParams.engine != "wiredTiger") {
+ return;
+ }
+
+ repl::ReplSettings replSettings;
+ replSettings.setOplogSizeBytes(10 * 1024 * 1024);
+ replSettings.setReplSetString("rs0");
+ auto coordinatorMock =
+ new repl::ReplicationCoordinatorMock(_opCtx->getServiceContext(), replSettings);
+ coordinatorMock->alwaysAllowWrites(true);
+ setGlobalReplicationCoordinator(coordinatorMock);
+
+ // Since the Client object persists across tests, even though the global
+ // ReplicationCoordinator does not, we need to clear the last op associated with the client
+ // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward.
+ repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest();
+
+ getGlobalServiceContext()->setOpObserver(stdx::make_unique<OpObserverImpl>());
+
+ repl::setOplogCollectionName();
+ repl::createOplog(_opCtx);
+
+ ASSERT_OK(_clock->advanceClusterTime(LogicalTime(Timestamp(1, 0))));
+ }
+
+ ~StorageTimestampTest() {
+ if (mongo::storageGlobalParams.engine != "wiredTiger") {
+ return;
+ }
+
+ try {
+ reset(NamespaceString("local.oplog.rs"));
+ } catch (...) {
+ FAIL("Exception while cleaning up test");
+ }
+ }
+
+ /**
+ * Walking on ice: resetting the ReplicationCoordinator destroys the underlying
+ * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection
+ * without actually dropping it.
+ */
+ void reset(NamespaceString nss) const {
+ ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] {
+ invariant(_opCtx->recoveryUnit()->selectSnapshot(SnapshotName::min()).isOK());
+ AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X);
+
+ if (collRaii.getCollection()) {
+ WriteUnitOfWork wunit(_opCtx);
+ invariant(collRaii.getCollection()->truncate(_opCtx).isOK());
+ collRaii.getCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false);
+ wunit.commit();
+ return;
+ }
+
+ AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X);
+ WriteUnitOfWork wunit(_opCtx);
+ invariant(dbRaii.getDb()->createCollection(_opCtx, nss.ns()));
+ wunit.commit();
+ });
+ }
+
+ void insertDocument(Collection* coll, const InsertStatement& stmt) {
+ // Insert some documents.
+ OpDebug* const nullOpDebug = nullptr;
+ const bool enforceQuota = false;
+ const bool fromMigrate = false;
+ ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, enforceQuota, fromMigrate));
+ }
+
+ std::int32_t itCount(Collection* coll) {
+ std::uint64_t ret = 0;
+ auto cursor = coll->getRecordStore()->getCursor(_opCtx);
+ while (cursor->next() != boost::none) {
+ ++ret;
+ }
+
+ return ret;
+ }
+
+ BSONObj findOne(Collection* coll) {
+ auto optRecord = coll->getRecordStore()->getCursor(_opCtx)->next();
+ ASSERT(optRecord != boost::none);
+ return optRecord.get().data.toBson();
+ }
+};
+
+class SecondaryInsertTimes : public StorageTimestampTest {
+public:
+ void run() {
+ // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes.
+ if (mongo::storageGlobalParams.engine != "wiredTiger") {
+ return;
+ }
+
+ // In order for applyOps to assign timestamps, we must be in non-replicated mode.
+ repl::UnreplicatedWritesBlock uwb(_opCtx);
+
+ // Create a new collection.
+ NamespaceString nss("unittests.timestampedUpdates");
+ reset(nss);
+
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
+
+ const std::uint32_t docsToInsert = 10;
+ const LogicalTime firstInsertTime = _clock->reserveTicks(docsToInsert);
+ for (std::uint32_t idx = 0; idx < docsToInsert; ++idx) {
+ BSONObjBuilder result;
+ ASSERT_OK(applyOps(
+ _opCtx,
+ nss.db().toString(),
+ BSON("applyOps" << BSON_ARRAY(
+ BSON("ts" << firstInsertTime.addTicks(idx).asTimestamp() << "t" << 1LL
+ << "h"
+ << 0xBEEFBEEFLL
+ << "v"
+ << 2
+ << "op"
+ << "i"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << autoColl.getCollection()->uuid().get()
+ << "o"
+ << BSON("_id" << idx))
+ << BSON("ts" << firstInsertTime.addTicks(idx).asTimestamp() << "t" << 1LL
+ << "h"
+ << 1
+ << "op"
+ << "c"
+ << "ns"
+ << "test.$cmd"
+ << "o"
+ << BSON("applyOps" << BSONArrayBuilder().obj())))),
+ &result));
+ }
+
+ for (std::uint32_t idx = 0; idx < docsToInsert; ++idx) {
+ auto recoveryUnit = _opCtx->recoveryUnit();
+ recoveryUnit->abandonSnapshot();
+ ASSERT_OK(recoveryUnit->selectSnapshot(
+ SnapshotName(firstInsertTime.addTicks(idx).asTimestamp())));
+ BSONObj result;
+ ASSERT(Helpers::getLast(_opCtx, nss.ns().c_str(), result)) << " idx is " << idx;
+ ASSERT_EQ(0, SimpleBSONObjComparator::kInstance.compare(result, BSON("_id" << idx)))
+ << "Doc: " << result.toString() << " Expected: " << BSON("_id" << idx);
+ }
+ }
+};
+
+class SecondaryArrayInsertTimes : public StorageTimestampTest {
+public:
+ void run() {
+ // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes.
+ if (mongo::storageGlobalParams.engine != "wiredTiger") {
+ return;
+ }
+
+ // In order for applyOps to assign timestamps, we must be in non-replicated mode.
+ repl::UnreplicatedWritesBlock uwb(_opCtx);
+
+ // Create a new collection.
+ NamespaceString nss("unittests.timestampedUpdates");
+ reset(nss);
+
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
+
+ const std::uint32_t docsToInsert = 10;
+ const LogicalTime firstInsertTime = _clock->reserveTicks(docsToInsert);
+ BSONObjBuilder fullCommand;
+ BSONArrayBuilder applyOpsB(fullCommand.subarrayStart("applyOps"));
+
+ BSONObjBuilder applyOpsElem1Builder;
+
+ // Populate the "ts" field with an array of all the grouped inserts' timestamps.
+ BSONArrayBuilder tsArrayBuilder(applyOpsElem1Builder.subarrayStart("ts"));
+ for (std::uint32_t idx = 0; idx < docsToInsert; ++idx) {
+ tsArrayBuilder.append(firstInsertTime.addTicks(idx).asTimestamp());
+ }
+ tsArrayBuilder.done();
+
+ // Populate the "t" (term) field with an array of all the grouped inserts' terms.
+ BSONArrayBuilder tArrayBuilder(applyOpsElem1Builder.subarrayStart("t"));
+ for (std::uint32_t idx = 0; idx < docsToInsert; ++idx) {
+ tArrayBuilder.append(1LL);
+ }
+ tArrayBuilder.done();
+
+ // Populate the "o" field with an array of all the grouped inserts.
+ BSONArrayBuilder oArrayBuilder(applyOpsElem1Builder.subarrayStart("o"));
+ for (std::uint32_t idx = 0; idx < docsToInsert; ++idx) {
+ oArrayBuilder.append(BSON("_id" << idx));
+ }
+ oArrayBuilder.done();
+
+ applyOpsElem1Builder << "h" << 0xBEEFBEEFLL << "v" << 2 << "op"
+ << "i"
+ << "ns" << nss.ns() << "ui" << autoColl.getCollection()->uuid().get();
+
+ applyOpsB.append(applyOpsElem1Builder.done());
+
+ BSONObjBuilder applyOpsElem2Builder;
+ applyOpsElem2Builder << "ts" << firstInsertTime.addTicks(docsToInsert).asTimestamp() << "t"
+ << 1LL << "h" << 1 << "op"
+ << "c"
+ << "ns"
+ << "test.$cmd"
+ << "o" << BSON("applyOps" << BSONArrayBuilder().obj());
+
+ applyOpsB.append(applyOpsElem2Builder.done());
+ applyOpsB.done();
+ // Apply the group of inserts.
+ BSONObjBuilder result;
+ ASSERT_OK(applyOps(_opCtx, nss.db().toString(), fullCommand.done(), &result));
+
+
+ for (std::uint32_t idx = 0; idx < docsToInsert; ++idx) {
+ auto recoveryUnit = _opCtx->recoveryUnit();
+ recoveryUnit->abandonSnapshot();
+ ASSERT_OK(recoveryUnit->selectSnapshot(
+ SnapshotName(firstInsertTime.addTicks(idx).asTimestamp())));
+ BSONObj result;
+ ASSERT(Helpers::getLast(_opCtx, nss.ns().c_str(), result)) << " idx is " << idx;
+ ASSERT_EQ(0, SimpleBSONObjComparator::kInstance.compare(result, BSON("_id" << idx)))
+ << "Doc: " << result.toString() << " Expected: " << BSON("_id" << idx);
+ }
+ }
+};
+
+class AllStorageTimestampTests : public unittest::Suite {
+public:
+ AllStorageTimestampTests() : unittest::Suite("StorageTimestampTests") {}
+ void setupTests() {
+ add<SecondaryInsertTimes>();
+ add<SecondaryArrayInsertTimes>();
+ }
+};
+
+unittest::SuiteInstance<AllStorageTimestampTests> allStorageTimestampTests;
+} // namespace mongo