summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-05-17 11:46:22 -0400
committerJudah Schvimer <judah@mongodb.com>2017-05-17 11:46:22 -0400
commit2482a5807dfa624f358569c727ff3b96ccfd8922 (patch)
treeffd8f4b949e3187705900b3204d05dcc344ab26f
parentf92d87b3cbe19941f7cf27b0fbf82af818503866 (diff)
downloadmongo-2482a5807dfa624f358569c727ff3b96ccfd8922.tar.gz
SERVER-28846 Use IDL to generate oplog entry parsers
-rw-r--r--jstests/replsets/rollback_cmd_unrollbackable.js2
-rw-r--r--jstests/replsets/rollback_fake_cmd.js90
-rw-r--r--src/mongo/db/repl/SConscript7
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp13
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp254
-rw-r--r--src/mongo/db/repl/multiapplier_test.cpp37
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp164
-rw-r--r--src/mongo/db/repl/oplog_entry.h114
-rw-r--r--src/mongo/db/repl/oplog_entry.idl97
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp17
-rw-r--r--src/mongo/db/repl/sync_tail.cpp24
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp56
-rw-r--r--src/mongo/util/uuid.h5
14 files changed, 495 insertions, 387 deletions
diff --git a/jstests/replsets/rollback_cmd_unrollbackable.js b/jstests/replsets/rollback_cmd_unrollbackable.js
index 661f9ead68a..0abc3384db6 100644
--- a/jstests/replsets/rollback_cmd_unrollbackable.js
+++ b/jstests/replsets/rollback_cmd_unrollbackable.js
@@ -49,7 +49,7 @@ var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0];
oplog_entry["ts"] = Timestamp(oplog_entry["ts"].t, oplog_entry["ts"].i + 1);
oplog_entry["op"] = "c";
oplog_entry["o"] = {
- "replSetSyncFrom": 1
+ "emptycapped": 1
};
assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry));
diff --git a/jstests/replsets/rollback_fake_cmd.js b/jstests/replsets/rollback_fake_cmd.js
deleted file mode 100644
index bbc82e56156..00000000000
--- a/jstests/replsets/rollback_fake_cmd.js
+++ /dev/null
@@ -1,90 +0,0 @@
-// test that a rollback of a nonexistent command causes a message to be logged
-//
-// If all data-bearing nodes in a replica set are using an ephemeral storage engine, the set will
-// not be able to survive a scenario where all data-bearing nodes are down simultaneously. In such a
-// scenario, none of the members will have any data, and upon restart will each look for a member to
-// inital sync from, so no primary will be elected. This test induces such a scenario, so cannot be
-// run on ephemeral storage engines.
-// @tags: [requires_persistence]
-
-// function to check the logs for an entry
-doesEntryMatch = function(array, regex) {
- var found = false;
- for (i = 0; i < array.length; i++) {
- if (regex.test(array[i])) {
- found = true;
- }
- }
- return found;
-};
-
-// set up a set and grab things for later
-var name = "rollback_fake_cmd";
-var replTest = new ReplSetTest({name: name, nodes: 3});
-var nodes = replTest.nodeList();
-var conns = replTest.startSet();
-replTest.initiate({
- "_id": name,
- "members": [
- {"_id": 0, "host": nodes[0], priority: 3},
- {"_id": 1, "host": nodes[1]},
- {"_id": 2, "host": nodes[2], arbiterOnly: true}
- ]
-});
-var a_conn = conns[0];
-var b_conn = conns[1];
-var AID = replTest.getNodeId(a_conn);
-var BID = replTest.getNodeId(b_conn);
-
-replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY);
-
-// get master and do an initial write
-var master = replTest.getPrimary();
-assert(master === conns[0], "conns[0] assumed to be master");
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-var options = {writeConcern: {w: 2, wtimeout: 60000}, upsert: true};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 1}, options));
-
-// shut down master
-replTest.stop(AID);
-
-// insert a fake oplog entry with a nonexistent command
-master = replTest.getPrimary();
-assert(b_conn.host === master.host, "b_conn assumed to be master");
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-// another insert to set minvalid ahead
-assert.writeOK(b_conn.getDB(name).foo.insert({x: 123}));
-var oplog_entry = b_conn.getDB("local").oplog.rs.find().sort({$natural: -1})[0];
-oplog_entry.ts = Timestamp(oplog_entry.ts.t, oplog_entry.ts.i + 1);
-oplog_entry.op = "c";
-oplog_entry.o = {
- fake_command_name: 1
-};
-assert.writeOK(b_conn.getDB("local").oplog.rs.insert(oplog_entry));
-jsTestLog('inserted oplog entry with invalid command: ' + tojson(oplog_entry));
-
-// shut down B and bring back the original master
-replTest.stop(BID);
-replTest.restart(AID);
-master = replTest.getPrimary();
-assert(a_conn.host === master.host, "a_conn assumed to be master");
-
-// do a write so that B will have to roll back
-options = {
- writeConcern: {w: 1, wtimeout: 60000},
- upsert: true
-};
-assert.writeOK(a_conn.getDB(name).foo.insert({x: 2}, options));
-
-// restart B, which should rollback and log a message about not rolling back the nonexistent cmd
-clearRawMongoProgramOutput();
-replTest.restart(BID);
-var msg = RegExp("rollback no such command fake_command_name");
-assert.soon(function() {
- return rawMongoProgramOutput().match(msg);
-}, "Did not see a log entry about skipping the nonexistent command during rollback");
-
-replTest.stopSet(undefined, undefined, {allowedExitCodes: [MongoRunner.EXIT_ABRUPT]});
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 3b84bb828b6..15ec4ed6ce2 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -486,11 +486,13 @@ env.CppUnitTest(
env.Library(
target='oplog_entry',
source=[
- 'oplog_entry.cpp',
+ 'oplog_entry.cpp',
+ env.Idlc('oplog_entry.idl')[0],
],
LIBDEPS=[
+ 'optime',
'$BUILD_DIR/mongo/base',
- 'optime'
+ '$BUILD_DIR/mongo/idl/idl_parser',
],
)
@@ -884,6 +886,7 @@ env.Library(
],
LIBDEPS=[
'abstract_oplog_fetcher',
+ 'oplog_entry',
'$BUILD_DIR/mongo/unittest/task_executor_proxy',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
index e7aeb7dfd0d..98c1bd08957 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/repl/abstract_oplog_fetcher_test_fixture.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -47,12 +48,12 @@ void ShutdownState::operator()(const Status& status) {
}
BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTimeWithHash opTimeWithHash) {
- BSONObjBuilder bob;
- bob.appendElements(opTimeWithHash.opTime.toBSON());
- bob.append("h", opTimeWithHash.value);
- bob.append("op", "c");
- bob.append("ns", "test.t");
- return bob.obj();
+ return OplogEntry(opTimeWithHash.opTime,
+ opTimeWithHash.value,
+ OpTypeEnum::kNoop,
+ NamespaceString("test.t"),
+ BSONObj())
+ .toBSON();
}
BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTime opTime, long long hash) {
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 4fc672ba3fa..07c5b4162cf 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1394,7 +1394,7 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() {
// Index builds are achieved through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index
// build.
- (entry.hasNamespace() && entry.getCollectionName() == "system.indexes")) {
+ (entry.getNamespace().isSystemDotIndexes())) {
if (ops.empty()) {
// Apply commands one-at-a-time.
ops.push_back(std::move(entry));
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index e36e2955557..1821e3f491d 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -479,15 +479,27 @@ BSONObj makeListDatabasesResponse(std::vector<std::string> databaseNames) {
/**
* Generates oplog entries with the given number used for the timestamp.
*/
-BSONObj makeOplogEntry(int t, const char* opType = "i", int version = OplogEntry::kOplogVersion) {
- return BSON("ts" << Timestamp(t, 1) << "h" << static_cast<long long>(t) << "ns"
- << "a.a"
- << "v"
- << version
- << "op"
- << opType
- << "o"
- << BSON("_id" << t << "a" << t));
+OplogEntry makeOplogEntry(int t,
+ OpTypeEnum opType = OpTypeEnum::kInsert,
+ int version = OplogEntry::kOplogVersion) {
+ BSONObj oField = BSON("_id" << t << "a" << t);
+ if (opType == OpTypeEnum::kCommand) {
+ // Insert an arbitrary command name so that the oplog entry is valid.
+ oField = BSON("dropIndexes"
+ << "a_1");
+ }
+ return OplogEntry(OpTime(Timestamp(t, 1), 1),
+ static_cast<long long>(t),
+ opType,
+ NamespaceString("a.a"),
+ version,
+ oField);
+}
+
+BSONObj makeOplogEntryObj(int t,
+ OpTypeEnum opType = OpTypeEnum::kInsert,
+ int version = OplogEntry::kOplogVersion) {
+ return makeOplogEntry(t, opType, version).toBSON();
}
void InitialSyncerTest::processSuccessfulLastOplogEntryFetcherResponse(std::vector<BSONObj> docs) {
@@ -1118,7 +1130,7 @@ TEST_F(InitialSyncerTest,
ASSERT_TRUE(initialSyncer->isActive());
// Last oplog entry second attempt.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
TEST_F(InitialSyncerTest,
@@ -1188,7 +1200,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
initialSyncer->join();
@@ -1224,7 +1236,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherScheduleError) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
net->runReadyNetworkOperations();
}
@@ -1255,7 +1267,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughOplogFetcherCallbackError) {
// Last oplog entry.
net->scheduleSuccessfulResponse(
- makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntry(1)}));
+ makeCursorResponse(0LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
net->runReadyNetworkOperations();
assertRemoteCommandNameEquals(
@@ -1299,7 +1311,7 @@ TEST_F(InitialSyncerTest,
auto request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL, _options.localOplogNS, {makeOplogEntry(1)})));
+ 0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
net->runReadyNetworkOperations();
@@ -1312,12 +1324,12 @@ TEST_F(InitialSyncerTest,
request =
assertRemoteCommandNameEquals("find",
net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL, _options.localOplogNS, {makeOplogEntry(1)})));
+ 0LL, _options.localOplogNS, {makeOplogEntryObj(1)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Last rollback checker replSetGetRBID command.
assertRemoteCommandNameEquals(
@@ -1326,8 +1338,7 @@ TEST_F(InitialSyncerTest,
}
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(makeOplogEntry(1)).getOpTime(),
- unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(makeOplogEntry(1).getOpTime(), unittest::assertGet(_lastApplied).opTime);
}
TEST_F(
@@ -1348,7 +1359,7 @@ TEST_F(
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
@@ -1356,17 +1367,19 @@ TEST_F(
// Oplog tailing query.
// Simulate cursor closing on sync source.
- auto request = assertRemoteCommandNameEquals(
- "find",
- net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL,
- _options.localOplogNS,
- {makeOplogEntry(1), makeOplogEntry(2, "c"), makeOplogEntry(3, "c")})));
+ auto request =
+ assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ _options.localOplogNS,
+ {makeOplogEntryObj(1),
+ makeOplogEntryObj(2, OpTypeEnum::kCommand),
+ makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(3)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(3)});
// Last rollback checker replSetGetRBID command.
assertRemoteCommandNameEquals(
@@ -1375,8 +1388,7 @@ TEST_F(
}
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(makeOplogEntry(3)).getOpTime(),
- unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(makeOplogEntry(3).getOpTime(), unittest::assertGet(_lastApplied).opTime);
}
TEST_F(
@@ -1397,7 +1409,7 @@ TEST_F(
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
assertRemoteCommandNameEquals(
"listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
@@ -1405,19 +1417,21 @@ TEST_F(
// Oplog tailing query.
// Simulate cursor closing on sync source.
- auto request = assertRemoteCommandNameEquals(
- "find",
- net->scheduleSuccessfulResponse(makeCursorResponse(
- 0LL,
- _options.localOplogNS,
- {makeOplogEntry(1), makeOplogEntry(2, "c"), makeOplogEntry(3, "c")})));
+ auto request =
+ assertRemoteCommandNameEquals("find",
+ net->scheduleSuccessfulResponse(makeCursorResponse(
+ 0LL,
+ _options.localOplogNS,
+ {makeOplogEntryObj(1),
+ makeOplogEntryObj(2, OpTypeEnum::kCommand),
+ makeOplogEntryObj(3, OpTypeEnum::kCommand)})));
ASSERT_TRUE(request.cmdObj.getBoolField("tailable"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
// Return an oplog entry with an optime that is more recent than what the completed
// OplogFetcher has read from the sync source.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(4)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(4)});
}
initialSyncer->join();
@@ -1453,7 +1467,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// InitialSyncer shuts down OplogFetcher when it fails to schedule DatabasesCloner
// so we should not expect any network requests in the queue.
@@ -1490,7 +1504,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// DatabasesCloner's first remote command - listDatabases
assertRemoteCommandNameEquals(
@@ -1524,7 +1538,7 @@ TEST_F(InitialSyncerTest, InitialSyncerIgnoresLocalDatabasesWhenCloningDatabases
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// DatabasesCloner's first remote command - listDatabases
assertRemoteCommandNameEquals(
@@ -1584,7 +1598,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// DatabasesCloner's first remote command - listDatabases
assertRemoteCommandNameEquals(
@@ -1650,7 +1664,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsBothOplogFetcherAndDatabasesCloner
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
ASSERT_OK(initialSyncer->shutdown());
@@ -1696,7 +1710,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -1732,7 +1746,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -1784,7 +1798,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -1831,7 +1845,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -1879,7 +1893,7 @@ TEST_F(
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -1937,7 +1951,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -1953,7 +1967,7 @@ TEST_F(InitialSyncerTest,
net->blackHole(noi);
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
@@ -1984,7 +1998,7 @@ TEST_F(
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2044,7 +2058,7 @@ TEST_F(
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2110,7 +2124,7 @@ TEST_F(
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2158,7 +2172,7 @@ TEST_F(
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2211,7 +2225,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnShutdown) {
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2265,7 +2279,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsLastRollbackCheckerOnOplogFetcherC
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2323,7 +2337,7 @@ TEST_F(InitialSyncerTest,
_syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto net = getNet();
int baseRollbackId = 1;
{
@@ -2384,7 +2398,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
// Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
// database names, we'll simulate copying a single database with a single collection on the
@@ -2435,7 +2449,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
ASSERT_EQUALS(nss.db(), request.dbname);
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({oplogEntry});
+ processSuccessfulLastOplogEntryFetcherResponse({oplogEntry.toBSON()});
// Last rollback checker replSetGetRBID command.
request = assertRemoteCommandNameEquals(
@@ -2448,8 +2462,8 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
}
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(oplogEntry).getOpTime(), unittest::assertGet(_lastApplied).opTime);
- ASSERT_EQUALS(oplogEntry["h"].Long(), unittest::assertGet(_lastApplied).value);
+ ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(oplogEntry.getHash(), unittest::assertGet(_lastApplied).value);
ASSERT_FALSE(_storageInterface->getInitialSyncFlag(opCtx.get()));
}
@@ -2472,7 +2486,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2494,7 +2508,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchScheduleE
_executorProxy->shouldFailScheduleWorkRequest = []() { return true; };
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
@@ -2526,7 +2540,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2548,7 +2562,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughSecondGetNextApplierBatchSch
_executorProxy->shouldFailScheduleWorkAtRequest = []() { return true; };
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after
// setting the completion status.
@@ -2580,7 +2594,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2597,7 +2611,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchOnShutdown) {
net->blackHole(noi);
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Since we black holed OplogFetcher's find request, _getNextApplierBatch_inlock() will
// not return any operations for us to apply, leading to _getNextApplierBatchCallback()
@@ -2622,9 +2636,9 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr
// _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
// version (not OplogEntry::kOplogVersion).
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto oplogEntryWithInconsistentVersion =
- makeOplogEntry(2, "i", OplogEntry::kOplogVersion + 100);
+ makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
auto net = getNet();
int baseRollbackId = 1;
@@ -2636,7 +2650,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2654,7 +2668,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughGetNextApplierBatchInLockErr
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _getNextApplierBatchCallback() will shut down the OplogFetcher after setting the
// completion status.
@@ -2680,9 +2694,9 @@ TEST_F(
// _getNextApplierBatch_inlock() returns BadValue when it gets an oplog entry with an unexpected
// version (not OplogEntry::kOplogVersion).
- auto oplogEntry = makeOplogEntry(1);
+ auto oplogEntry = makeOplogEntryObj(1);
auto oplogEntryWithInconsistentVersion =
- makeOplogEntry(2, "i", OplogEntry::kOplogVersion + 100);
+ makeOplogEntryObj(2, OpTypeEnum::kInsert, OplogEntry::kOplogVersion + 100);
// Enable 'rsSyncApplyStop' so that _getNextApplierBatch_inlock() returns an empty batch of
// operations instead of a batch containing an oplog entry with a bad version.
@@ -2700,7 +2714,7 @@ TEST_F(
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2717,7 +2731,7 @@ TEST_F(
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Since the 'rsSyncApplyStop' fail point is enabled, InitialSyncer will get an empty
// batch of operations from _getNextApplierBatch_inlock() even though the oplog buffer
@@ -2755,7 +2769,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2771,14 +2785,15 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierScheduleError) {
auto oplogFetcherNoi = noi;
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _getNextApplierBatchCallback() should have rescheduled itself.
// We'll insert some operations in the oplog buffer so that we'll attempt to schedule
// MultiApplier next time _getNextApplierBatchCallback() runs.
net->scheduleSuccessfulResponse(
oplogFetcherNoi,
- makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}));
+ makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
net->runReadyNetworkOperations();
// Ignore OplogFetcher's getMore request.
@@ -2825,7 +2840,7 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2834,14 +2849,14 @@ TEST_F(InitialSyncerTest, InitialSyncerPassesThroughMultiApplierCallbackError) {
net->runReadyNetworkOperations();
// OplogFetcher's oplog tailing query. Provide enough operations to trigger MultiApplier.
- auto request = net->scheduleSuccessfulResponse(
- makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}));
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntryObj(1), makeOplogEntryObj(2)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// _multiApplierCallback() will shut down the OplogFetcher after setting the completion
// status.
@@ -2871,7 +2886,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplog
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2887,7 +2902,7 @@ TEST_F(InitialSyncerTest, InitialSyncerCancelsGetNextApplierBatchCallbackOnOplog
auto oplogFetcherNoi = noi;
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Send error to _oplogFetcherCallback().
net->scheduleErrorResponse(oplogFetcherNoi,
@@ -2924,7 +2939,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -2934,14 +2949,14 @@ TEST_F(InitialSyncerTest,
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
- auto request = net->scheduleSuccessfulResponse(
- makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), lastOp}));
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
+ 1LL, _options.localOplogNS, {makeOplogEntryObj(1), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({lastOp});
+ processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
// Black hole OplogFetcher's getMore request.
auto noi = net->getNextReadyRequest();
@@ -2962,8 +2977,8 @@ TEST_F(InitialSyncerTest,
}
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
- ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
+ ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
}
TEST_F(InitialSyncerTest,
@@ -2977,7 +2992,7 @@ TEST_F(InitialSyncerTest,
// To make InitialSyncer apply multiple batches, we make the third and last operation a command
// so that it will go into a separate batch from the second operation. First operation is the
// last fetched entry before data cloning and is not applied.
- auto lastOp = makeOplogEntry(3, "c");
+ auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
auto net = getNet();
int baseRollbackId = 1;
@@ -2989,7 +3004,7 @@ TEST_F(InitialSyncerTest,
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Instead of fast forwarding to DatabasesCloner completion by returning an empty list of
// database names, we'll simulate copying a single database with a single collection on the
@@ -3002,8 +3017,10 @@ TEST_F(InitialSyncerTest,
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
- request = net->scheduleSuccessfulResponse(makeCursorResponse(
- 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2), lastOp}));
+ request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(1LL,
+ _options.localOplogNS,
+ {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
@@ -3045,7 +3062,7 @@ TEST_F(InitialSyncerTest,
ASSERT_EQUALS(nss.db(), request.dbname);
// Second last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({lastOp});
+ processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
// Last rollback ID.
request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
@@ -3060,8 +3077,8 @@ TEST_F(InitialSyncerTest,
}
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
- ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
+ ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
}
TEST_F(
@@ -3095,7 +3112,7 @@ TEST_F(
ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
// Use command for third and last operation to ensure we have two batches to apply.
- auto lastOp = makeOplogEntry(3, "c");
+ auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
auto net = getNet();
int baseRollbackId = 1;
@@ -3107,7 +3124,7 @@ TEST_F(
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Quickest path to a successful DatabasesCloner completion is to respond to the
// listDatabases with an empty list of database names.
@@ -3117,8 +3134,10 @@ TEST_F(
// OplogFetcher's oplog tailing query. Response has enough operations to reach
// end timestamp.
- auto request = net->scheduleSuccessfulResponse(makeCursorResponse(
- 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2), lastOp}));
+ auto request = net->scheduleSuccessfulResponse(
+ makeCursorResponse(1LL,
+ _options.localOplogNS,
+ {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
@@ -3126,7 +3145,7 @@ TEST_F(
// Second last oplog entry fetcher.
// Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(2)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
// Black hole OplogFetcher's getMore request.
auto noi = net->getNextReadyRequest();
@@ -3135,7 +3154,7 @@ TEST_F(
net->blackHole(noi);
// Third last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({lastOp});
+ processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
// Last rollback ID.
request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
@@ -3150,8 +3169,8 @@ TEST_F(
}
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(lastOp).getOpTime(), unittest::assertGet(_lastApplied).opTime);
- ASSERT_EQUALS(lastOp["h"].Long(), unittest::assertGet(_lastApplied).value);
+ ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value);
ASSERT_TRUE(fetchCountIncremented);
@@ -3194,7 +3213,7 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
// Ignore listDatabases request.
auto noi = net->getNextReadyRequest();
@@ -3204,7 +3223,7 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
// OplogFetcher's oplog tailing query.
request = net->scheduleSuccessfulResponse(
- makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1)}));
+ makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntryObj(1)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
@@ -3214,7 +3233,7 @@ TEST_F(InitialSyncerTest, OplogOutOfOrderOnOplogFetchFinish) {
// oplog entry has a timestamp of 1):
// (last=1), 5, 4
request = net->scheduleSuccessfulResponse(makeCursorResponse(
- 1LL, _options.localOplogNS, {makeOplogEntry(5), makeOplogEntry(4)}, false));
+ 1LL, _options.localOplogNS, {makeOplogEntryObj(5), makeOplogEntryObj(4)}, false));
assertRemoteCommandNameEquals("getMore", request);
net->runReadyNetworkOperations();
@@ -3245,7 +3264,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
log() << "Done playing first failed response";
@@ -3291,7 +3310,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
net->runReadyNetworkOperations();
// Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(1)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
}
log() << "Done playing first successful response";
@@ -3334,13 +3353,13 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
// Ignore oplog tailing query.
request = net->scheduleSuccessfulResponse(makeCursorResponse(1LL,
_options.localOplogNS,
- {makeOplogEntry(1),
- makeOplogEntry(2),
- makeOplogEntry(3),
- makeOplogEntry(4),
- makeOplogEntry(5),
- makeOplogEntry(6),
- makeOplogEntry(7)}));
+ {makeOplogEntryObj(1),
+ makeOplogEntryObj(2),
+ makeOplogEntryObj(3),
+ makeOplogEntryObj(4),
+ makeOplogEntryObj(5),
+ makeOplogEntryObj(6),
+ makeOplogEntryObj(7)}));
assertRemoteCommandNameEquals("find", request);
ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
net->runReadyNetworkOperations();
@@ -3385,7 +3404,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
// Second last oplog entry fetcher.
// Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
// applying the first batch.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntry(7)});
+ processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(7)});
}
log() << "Done playing all but last successful response";
@@ -3447,8 +3466,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
log() << "waiting for initial sync to verify it completed OK";
initialSyncer->join();
- ASSERT_EQUALS(OplogEntry(makeOplogEntry(7)).getOpTime(),
- unittest::assertGet(_lastApplied).opTime);
+ ASSERT_EQUALS(makeOplogEntry(7).getOpTime(), unittest::assertGet(_lastApplied).opTime);
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress at end: " << progress;
diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp
index dcbd6979adf..45f387ecd0c 100644
--- a/src/mongo/db/repl/multiapplier_test.cpp
+++ b/src/mongo/db/repl/multiapplier_test.cpp
@@ -73,8 +73,16 @@ Status applyOperation(MultiApplier::OperationPtrs*) {
return Status::OK();
};
+/**
+ * Generates oplog entries with the given number used for the timestamp.
+ */
+OplogEntry makeOplogEntry(int ts) {
+ return OplogEntry(
+ OpTime(Timestamp(ts, 1), 1), 1LL, OpTypeEnum::kNoop, NamespaceString("a.a"), BSONObj());
+}
+
TEST_F(MultiApplierTest, InvalidConstruction) {
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
auto multiApply = [](OperationContext*,
MultiApplier::Operations,
MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> {
@@ -97,21 +105,6 @@ TEST_F(MultiApplierTest, InvalidConstruction) {
ErrorCodes::BadValue,
"empty list of operations");
- // Last operation missing timestamp field.
- ASSERT_THROWS_CODE_AND_WHAT(
- MultiApplier(&getExecutor(), {OplogEntry(BSONObj())}, applyOperation, multiApply, callback),
- UserException,
- ErrorCodes::FailedToParse,
- "last operation missing 'ts' field: {}");
-
- // "ts" field in last operation not a timestamp.
- ASSERT_THROWS_CODE_AND_WHAT(
- MultiApplier(
- &getExecutor(), {OplogEntry(BSON("ts" << 123))}, applyOperation, multiApply, callback),
- UserException,
- ErrorCodes::TypeMismatch,
- "'ts' in last operation not a timestamp: { ts: 123 }");
-
// Invalid apply operation function.
ASSERT_THROWS_CODE_AND_WHAT(
MultiApplier(
@@ -138,7 +131,7 @@ TEST_F(MultiApplierTest, InvalidConstruction) {
}
TEST_F(MultiApplierTest, MultiApplierTransitionsDirectlyToCompleteIfShutdownBeforeStarting) {
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
auto multiApply = [](OperationContext*,
MultiApplier::Operations,
@@ -153,7 +146,7 @@ TEST_F(MultiApplierTest, MultiApplierTransitionsDirectlyToCompleteIfShutdownBefo
}
TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUponCancellation) {
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
bool multiApplyInvoked = false;
auto multiApply = [&](OperationContext* opCtx,
@@ -190,7 +183,7 @@ TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUp
}
TEST_F(MultiApplierTest, MultiApplierPassesMultiApplyErrorToCallback) {
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
bool multiApplyInvoked = false;
Status multiApplyError(ErrorCodes::OperationFailed, "multi apply failed");
@@ -219,7 +212,7 @@ TEST_F(MultiApplierTest, MultiApplierPassesMultiApplyErrorToCallback) {
}
TEST_F(MultiApplierTest, MultiApplierCatchesMultiApplyExceptionAndConvertsToCallbackStatus) {
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
bool multiApplyInvoked = false;
Status multiApplyError(ErrorCodes::OperationFailed, "multi apply failed");
@@ -251,7 +244,7 @@ TEST_F(MultiApplierTest, MultiApplierCatchesMultiApplyExceptionAndConvertsToCall
TEST_F(
MultiApplierTest,
MultiApplierProvidesOperationContextToMultiApplyFunctionButDisposesBeforeInvokingFinishCallback) {
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
OperationContext* multiApplyTxn = nullptr;
MultiApplier::Operations operationsToApply;
@@ -305,7 +298,7 @@ TEST_F(MultiApplierTest, MultiApplierResetsOnCompletionCallbackFunctionPointerUp
bool sharedCallbackStateDestroyed = false;
auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
- const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+ const MultiApplier::Operations operations{makeOplogEntry(123)};
auto multiApply = [&](OperationContext*,
MultiApplier::Operations operations,
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 7ec2db33ab1..130543160bf 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -33,78 +33,148 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace repl {
+namespace {
+
+OplogEntry::CommandType parseCommandType(const BSONObj& objectField) {
+ StringData commandString(objectField.firstElementFieldName());
+ if (commandString == "create") {
+ return OplogEntry::CommandType::kCreate;
+ } else if (commandString == "renameCollection") {
+ return OplogEntry::CommandType::kRenameCollection;
+ } else if (commandString == "drop") {
+ return OplogEntry::CommandType::kDrop;
+ } else if (commandString == "collMod") {
+ return OplogEntry::CommandType::kCollMod;
+ } else if (commandString == "applyOps") {
+ return OplogEntry::CommandType::kApplyOps;
+ } else if (commandString == "dropDatabase") {
+ return OplogEntry::CommandType::kDropDatabase;
+ } else if (commandString == "emptycapped") {
+ return OplogEntry::CommandType::kEmptyCapped;
+ } else if (commandString == "convertToCapped") {
+ return OplogEntry::CommandType::kConvertToCapped;
+ } else if (commandString == "createIndex") {
+ return OplogEntry::CommandType::kCreateIndex;
+ } else if (commandString == "dropIndexes") {
+ return OplogEntry::CommandType::kDropIndexes;
+ } else if (commandString == "deleteIndexes") {
+ return OplogEntry::CommandType::kDropIndexes;
+ } else {
+ severe() << "Unknown oplog entry command type: " << commandString
+ << " Object field: " << redact(objectField);
+ fassertFailedNoTrace(40444);
+ }
+ MONGO_UNREACHABLE;
+}
+
+} // namespace
+
const int OplogEntry::kOplogVersion = 2;
-OplogEntry::OplogEntry(BSONObj rawInput) : raw(std::move(rawInput)) {
- if (MONGO_unlikely(!raw.isOwned())) {
- raw = raw.copy();
+// Static
+StatusWith<OplogEntry> OplogEntry::parse(const BSONObj& object) {
+ try {
+ return OplogEntry(object);
+ } catch (...) {
+ return exceptionToStatus();
}
+ MONGO_UNREACHABLE;
+}
- BSONElement version;
- for (auto elem : raw) {
- const auto name = elem.fieldNameStringData();
- if (name == "ns") {
- _ns = NamespaceString(elem.valuestrsafe());
- } else if (name == "op") {
- _opType = elem.valuestrsafe();
- } else if (name == "o2") {
- _o2 = elem.Obj();
- } else if (name == "ts") {
- _ts = elem.timestamp();
- } else if (name == "v") {
- version = elem;
- } else if (name == "o") {
- _o = elem.Obj();
- }
- }
+OplogEntry::OplogEntry(BSONObj rawInput)
+ : raw(std::move(rawInput)), _commandType(OplogEntry::CommandType::kNotCommand) {
+ raw = raw.getOwned();
+
+ parseProtected(IDLParserErrorContext("OplogEntryBase"), raw);
- _version = version.eoo() ? 1 : version.numberInt();
+ // Parse command type from 'o' and 'o2' fields.
+ if (isCommand()) {
+ _commandType = parseCommandType(getObject());
+ }
+}
+OplogEntry::OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ int version,
+ const BSONObj& oField,
+ const BSONObj& o2Field) {
+ setTimestamp(opTime.getTimestamp());
+ setTerm(opTime.getTerm());
+ setHash(hash);
+ setOpType(opType);
+ setNamespace(nss);
+ setVersion(version);
+ setObject(oField);
+ setObject2(o2Field);
+
+ // This is necessary until we remove `raw` in SERVER-29200.
+ raw = toBSON();
}
+OplogEntry::OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ int version,
+ const BSONObj& oField)
+ : OplogEntry(opTime, hash, opType, nss, version, oField, BSONObj()) {}
+
+OplogEntry::OplogEntry(
+ OpTime opTime, long long hash, OpTypeEnum opType, NamespaceString nss, const BSONObj& oField)
+ : OplogEntry(opTime, hash, opType, nss, OplogEntry::kOplogVersion, oField, BSONObj()) {}
+
+OplogEntry::OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ const BSONObj& oField,
+ const BSONObj& o2Field)
+ : OplogEntry(opTime, hash, opType, nss, OplogEntry::kOplogVersion, oField, o2Field) {}
+
bool OplogEntry::isCommand() const {
- return getOpType()[0] == 'c';
+ return getOpType() == OpTypeEnum::kCommand;
}
bool OplogEntry::isCrudOpType() const {
- switch (getOpType()[0]) {
- case 'd':
- case 'i':
- case 'u':
- return getOpType()[1] == 0;
+ switch (getOpType()) {
+ case OpTypeEnum::kInsert:
+ case OpTypeEnum::kDelete:
+ case OpTypeEnum::kUpdate:
+ return true;
+ case OpTypeEnum::kCommand:
+ case OpTypeEnum::kNoop:
+ return false;
}
- return false;
-}
-
-bool OplogEntry::hasNamespace() const {
- return !getNamespace().isEmpty();
-}
-
-int OplogEntry::getVersion() const {
- return _version;
+ MONGO_UNREACHABLE;
}
BSONElement OplogEntry::getIdElement() const {
invariant(isCrudOpType());
- switch (getOpType()[0]) {
- case 'u':
- return getObject2()["_id"];
- case 'd':
- case 'i':
- return getObject()["_id"];
+ if (getOpType() == OpTypeEnum::kUpdate) {
+ return getObject2()->getField("_id");
+ } else {
+ return getObject()["_id"];
}
- MONGO_UNREACHABLE;
}
-OpTime OplogEntry::getOpTime() const {
- return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw));
+OplogEntry::CommandType OplogEntry::getCommandType() const {
+ invariant(isCommand());
+ invariant(_commandType != OplogEntry::CommandType::kNotCommand);
+ return _commandType;
}
-StringData OplogEntry::getCollectionName() const {
- return getNamespace().coll();
+OpTime OplogEntry::getOpTime() const {
+ long long term = OpTime::kUninitializedTerm;
+ if (getTerm()) {
+ term = getTerm().get();
+ }
+ return OpTime(getTimestamp(), term);
}
std::string OplogEntry::toString() const {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 4e7b3d67839..7e7d5e03603 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -30,70 +30,104 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/optime.h"
namespace mongo {
namespace repl {
/**
- * A parsed oplog entry.
- *
- * This only includes the fields used by the code using this object at the time this was
- * written. As more code uses this, more fields should be added.
- *
- * All unowned members (such as StringDatas and BSONElements) point into the raw BSON.
- * All StringData members are guaranteed to be NUL terminated.
+ * A parsed oplog entry that inherits from the OplogEntryBase parsed by the IDL.
*/
-struct OplogEntry {
+class OplogEntry : public OplogEntryBase {
+public:
+ enum class CommandType {
+ kNotCommand,
+ kCreate,
+ kRenameCollection,
+ kDrop,
+ kCollMod,
+ kApplyOps,
+ kDropDatabase,
+ kEmptyCapped,
+ kConvertToCapped,
+ kCreateIndex,
+ kDropIndexes
+ };
+
// Current oplog version, should be the value of the v field in all oplog entries.
static const int kOplogVersion;
+ static StatusWith<OplogEntry> parse(const BSONObj& object);
+
+ OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ int version,
+ const BSONObj& oField,
+ const BSONObj& o2Field);
+ OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ int version,
+ const BSONObj& oField);
+ OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ const BSONObj& oField);
+ OplogEntry(OpTime opTime,
+ long long hash,
+ OpTypeEnum opType,
+ NamespaceString nss,
+ const BSONObj& oField,
+ const BSONObj& o2Field);
+
+ // DEPRECATED: This constructor can throw. Use static parse method instead.
explicit OplogEntry(BSONObj raw);
+ OplogEntry() = delete;
+
// This member is not parsed from the BSON and is instead populated by fillWriterVectors.
bool isForCappedCollection = false;
+ /**
+ * Returns if the oplog entry is for a command operation.
+ */
bool isCommand() const;
- bool isCrudOpType() const;
- bool hasNamespace() const;
- int getVersion() const;
- BSONElement getIdElement() const;
- OpTime getOpTime() const;
- Seconds getTimestampSecs() const;
- StringData getCollectionName() const;
- std::string toString() const;
- BSONObj raw; // Owned.
+ /**
+ * Returns if the oplog entry is for a CRUD operation.
+ */
+ bool isCrudOpType() const;
- // TODO: Remove these when we add the IDL.
- const NamespaceString& getNamespace() const {
- return _ns;
- }
+ /**
+ * Returns the _id of the document being modified. Must be called on CRUD ops.
+ */
+ BSONElement getIdElement() const;
- StringData getOpType() const {
- return _opType;
- }
+ /**
+ * Returns the type of command of the oplog entry. Must be called on a command op.
+ */
+ CommandType getCommandType() const;
- Timestamp getTimestamp() const {
- return _ts;
- }
+ /**
+ * Returns the OpTime of the oplog entry.
+ */
+ OpTime getOpTime() const;
- const BSONObj& getObject() const {
- return _o;
- }
+ /**
+ * Serializes the oplog entry to a string.
+ */
+ std::string toString() const;
- const BSONObj& getObject2() const {
- return _o2;
- }
+ // TODO (SERVER-29200): make `raw` private. Do not add more direct uses of `raw`.
+ BSONObj raw; // Owned.
private:
- NamespaceString _ns;
- StringData _opType = "";
- int _version;
- Timestamp _ts;
- BSONObj _o;
- BSONObj _o2;
+ CommandType _commandType;
};
std::ostream& operator<<(std::ostream& s, const OplogEntry& o);
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
new file mode 100644
index 00000000000..9611613d1d8
--- /dev/null
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -0,0 +1,97 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# Oplog Entry IDL File
+
+global:
+ cpp_namespace: "mongo::repl"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+enums:
+ OpType:
+ description: "The type of an operation in the oplog"
+ type: string
+ values:
+ kCommand: "c"
+ kInsert: "i"
+ kUpdate: "u"
+ kDelete: "d"
+ kNoop: "n"
+
+structs:
+ OplogEntryBase:
+ description: A document in which the server stores an oplog entry.
+ fields:
+ ts:
+ cpp_name: timestamp
+ type: timestamp
+ description: "The time when the oplog entry was created"
+ t:
+ cpp_name: term
+ type: long
+ optional: true # The term is optional for PV0 oplog entries.
+ description: "The term of the primary that created the oplog entry"
+ h:
+ cpp_name: hash
+ type: long
+ description: "The hash of the oplog entry"
+ v:
+ cpp_name: version
+ type: safeInt64
+ default: 1
+ description: "The version of the oplog"
+ op:
+ cpp_name: opType
+ type: OpType
+ description: "The operation type"
+ ns:
+ cpp_name: namespace
+ type: namespacestring
+ description: "The namespace on which to apply the operation"
+ ui:
+ cpp_name: uuid
+ type: uuid
+ optional: true
+ description: "The UUID of the collection"
+ fromMigrate:
+ type: bool
+ optional: true
+ description: "An operation caused by a chunk migration"
+ o:
+ cpp_name: object
+ type: object
+ description: "The operation applied"
+ o2:
+ cpp_name: object2
+ type: object
+ optional: true
+ description: "Additional information about the operation applied"
+ _id:
+ type: objectid
+ optional: true
+ description: "An optional _id field for tests that manually insert oplog entries"
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index bab9d57bb89..da410b6082a 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/cursor_id.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/sync_source_selector_mock.h"
@@ -294,13 +295,21 @@ void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net
net->runReadyNetworkOperations();
}
+/**
+ * Generates oplog entries with the given optime.
+ */
+BSONObj _makeOplogEntry(Timestamp ts, long long term) {
+ return OplogEntry(OpTime(ts, term), 1LL, OpTypeEnum::kNoop, NamespaceString("a.a"), BSONObj())
+ .toBSON();
+}
+
void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net,
SyncSourceSelectorMock* selector,
HostAndPort currentSyncSource,
HostAndPort nextSyncSource,
Timestamp ts) {
_scheduleFirstOplogEntryFetcherResponse(
- net, selector, currentSyncSource, nextSyncSource, {BSON("ts" << ts << "t" << 0)});
+ net, selector, currentSyncSource, nextSyncSource, {_makeOplogEntry(ts, 0LL)});
}
void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net,
@@ -631,7 +640,7 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
selector,
currentSyncSource,
requiredOpTime,
- {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())});
+ {_makeOplogEntry(requiredOpTime.getTimestamp(), requiredOpTime.getTerm())});
}
const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
@@ -682,7 +691,7 @@ TEST_F(SyncSourceResolverTest,
_selector.get(),
candidate1,
requiredOpTime,
- {BSON("ts" << requiredOpTime.getTimestamp() << "t" << OpTime::kUninitializedTerm)});
+ {_makeOplogEntry(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm)});
ASSERT_TRUE(_resolver->isActive());
ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest());
@@ -791,7 +800,7 @@ TEST_F(SyncSourceResolverTest,
_selector.get(),
candidate1,
requiredOpTime,
- {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm() + 1)});
+ {_makeOplogEntry(requiredOpTime.getTimestamp(), requiredOpTime.getTerm() + 1)});
ASSERT_TRUE(_resolver->isActive());
ASSERT_EQUALS(candidate1, _selector->getLastBlacklistedSyncSource_forTest());
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 54364c9c839..68542125d4d 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -602,7 +602,7 @@ void fillWriterVectors(OperationContext* opCtx,
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
- if (op.getOpType() == "i" && collProperties.isCapped) {
+ if (op.getOpType() == OpTypeEnum::kInsert && collProperties.isCapped) {
// Mark capped collection ops before storing them to ensure we do not attempt to
// bulk insert them.
op.isForCappedCollection = true;
@@ -869,15 +869,12 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
auto& entry = ops->back();
- if (!entry.raw.isEmpty()) {
- // check for oplog version change
- int curVersion = entry.getVersion();
- if (curVersion != OplogEntry::kOplogVersion) {
- severe() << "expected oplog version " << OplogEntry::kOplogVersion
- << " but found version " << curVersion
- << " in oplog entry: " << redact(entry.raw);
- fassertFailedNoTrace(18820);
- }
+ // check for oplog version change
+ int curVersion = entry.getVersion();
+ if (curVersion != OplogEntry::kOplogVersion) {
+ severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version "
+ << curVersion << " in oplog entry: " << redact(entry.raw);
+ fassertFailedNoTrace(18820);
}
auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs()));
@@ -893,8 +890,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
}
// Check for ops that must be processed one at a time.
- if (entry.raw.isEmpty() || // sentinel that network queue is drained.
- (entry.getOpType()[0] == 'c') || // commands.
+ if (entry.isCommand() || // commands.
// Index builds are achieved through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
(!entry.getNamespace().isEmpty() && entry.getNamespace().coll() == "system.indexes")) {
@@ -1079,7 +1075,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
oplogEntriesIterator != oplogEntryPointers->end();
++oplogEntriesIterator) {
auto entry = *oplogEntriesIterator;
- if (entry->getOpType()[0] == 'i' && !entry->isForCappedCollection &&
+ if (entry->getOpType() == OpTypeEnum::kInsert && !entry->isForCappedCollection &&
oplogEntriesIterator > doNotGroupBeforePoint) {
// Attempt to group inserts if possible.
std::vector<BSONObj> toInsert;
@@ -1089,7 +1085,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx,
oplogEntriesIterator + 1,
oplogEntryPointers->end(),
[&](const OplogEntry* nextEntry) {
- return nextEntry->getOpType()[0] != 'i' || // Must be an insert.
+ return nextEntry->getOpType() != OpTypeEnum::kInsert || // Must be an insert.
nextEntry->getNamespace() !=
entry->getNamespace() || // Must be the same namespace.
// Must not create too large an object.
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 9a83faa52d4..98ea1e71399 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -195,14 +195,7 @@ void createCollection(OperationContext* opCtx,
OplogEntry makeCommandOplogEntry(OpTime opTime,
const NamespaceString& nss,
const BSONObj& command) {
- BSONObjBuilder bob;
- bob.appendElements(opTime.toBSON());
- bob.append("h", 1LL);
- bob.append("v", 2);
- bob.append("op", "c");
- bob.append("ns", nss.getCommandNS().ns());
- bob.append("o", command);
- return OplogEntry(bob.obj());
+ return OplogEntry(opTime, 1LL, OpTypeEnum::kCommand, nss.getCommandNS(), 2, command);
}
/**
@@ -223,13 +216,7 @@ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime,
OplogEntry makeInsertDocumentOplogEntry(OpTime opTime,
const NamespaceString& nss,
const BSONObj& documentToInsert) {
- BSONObjBuilder bob;
- bob.appendElements(opTime.toBSON());
- bob.append("h", 1LL);
- bob.append("op", "i");
- bob.append("ns", nss.ns());
- bob.append("o", documentToInsert);
- return OplogEntry(bob.obj());
+ return OplogEntry(opTime, 1LL, OpTypeEnum::kInsert, nss, documentToInsert);
}
/**
@@ -239,14 +226,7 @@ OplogEntry makeUpdateDocumentOplogEntry(OpTime opTime,
const NamespaceString& nss,
const BSONObj& documentToUpdate,
const BSONObj& updatedDocument) {
- BSONObjBuilder bob;
- bob.appendElements(opTime.toBSON());
- bob.append("h", 1LL);
- bob.append("op", "u");
- bob.append("ns", nss.ns());
- bob.append("o2", documentToUpdate);
- bob.append("o", updatedDocument);
- return OplogEntry(bob.obj());
+ return OplogEntry(opTime, 1LL, OpTypeEnum::kUpdate, nss, updatedDocument, documentToUpdate);
}
/**
@@ -649,10 +629,7 @@ TEST_F(SyncTailTest, MultiSyncApplyDisablesDocumentValidationWhileApplyingOperat
TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToApplyOperation) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- OplogEntry op(BSON("op"
- << "x"
- << "ns"
- << nss.ns()));
+ OplogEntry op(OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, nss, BSONObj());
auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status {
return {ErrorCodes::OperationFailed, ""};
};
@@ -663,10 +640,7 @@ TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyErrorAfterFailingToAppl
TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) {
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- OplogEntry op(BSON("op"
- << "x"
- << "ns"
- << nss.ns()));
+ OplogEntry op(OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, nss, BSONObj());
auto syncApply = [](OperationContext*, const BSONObj&, bool) -> Status {
uasserted(ErrorCodes::OperationFailed, "");
MONGO_UNREACHABLE;
@@ -679,12 +653,8 @@ TEST_F(SyncTailTest, MultiSyncApplyPassesThroughSyncApplyException) {
TEST_F(SyncTailTest, MultiSyncApplySortsOperationsStablyByNamespaceBeforeApplying) {
int x = 0;
auto makeOp = [&x](const char* ns) -> OplogEntry {
- return OplogEntry(BSON("op"
- << "x"
- << "ns"
- << ns
- << "x"
- << x++));
+ return OplogEntry(
+ OpTime(Timestamp(1, 1), 1), 1LL, OpTypeEnum::kDelete, NamespaceString(ns), BSONObj());
};
auto op1 = makeOp("test.t1");
auto op2 = makeOp("test.t1");
@@ -960,13 +930,15 @@ TEST_F(SyncTailTest, MultiInitialSyncApplyRetriesFailedUpdateIfDocumentIsAvailab
TEST_F(SyncTailTest, MultiInitialSyncApplyPassesThroughSyncApplyErrorAfterFailingToRetryBadOp) {
SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1));
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
- OplogEntry op(BSON("op"
- << "x"
- << "ns"
- << nss.ns()));
+ OplogEntry op(OpTime(Timestamp(1, 1), 1),
+ 1LL,
+ OpTypeEnum::kUpdate,
+ nss,
+ BSON("_id" << 1),
+ BSON("_id" << 1));
MultiApplier::OperationPtrs ops = {&op};
AtomicUInt32 fetchCount(0);
- ASSERT_EQUALS(ErrorCodes::BadValue,
+ ASSERT_EQUALS(ErrorCodes::OperationFailed,
multiInitialSyncApply_noAbort(_opCtx.get(), &ops, &syncTail, &fetchCount));
ASSERT_EQUALS(fetchCount.load(), 1U);
}
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index 98e41c96672..4f86e26aa7c 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -41,6 +41,10 @@
namespace mongo {
+namespace repl {
+class OplogEntryBase;
+} // namespace repl
+
/**
* A UUID is a 128-bit unique identifier, per RFC 4122, v4, using
* a secure random number generator.
@@ -51,6 +55,7 @@ class UUID {
// Make the IDL generated parser a friend
friend class One_UUID;
friend class Logical_session_id;
+ friend class repl::OplogEntryBase;
public:
/**