summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-11-17 16:39:51 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-11-20 15:01:11 -0500
commitc98068d7c836e2d36a862189733d903b24b02d9a (patch)
tree6d072ccacc3853e47a848e1fdf57d40196964bda /src/mongo/db/s
parent697db2c561f006cb9e1be9312e72c5072dd12530 (diff)
downloadmongo-c98068d7c836e2d36a862189733d903b24b02d9a.tar.gz
SERVER-32027 Fix unit-tests which rely on having a valid wallclock time
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp4
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp98
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h16
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp30
4 files changed, 65 insertions, 83 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 05436465040..b12c13b2a05 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -207,8 +207,8 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
- _sessionCatalogSource = stdx::make_unique<SessionCatalogMigrationSource>(_args.getNss());
- _sessionCatalogSource->init(opCtx);
+ _sessionCatalogSource =
+ stdx::make_unique<SessionCatalogMigrationSource>(opCtx, _args.getNss());
// Prime up the session migration source if there are oplog entries to migrate.
_sessionCatalogSource->fetchNextOplog(opCtx);
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 5328f0ac30e..9ca38970dc7 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -110,10 +110,52 @@ repl::OplogEntry makeSentinelOplogEntry(OperationSessionInfo sessionInfo) {
kIncompleteHistoryStmtId); // statement id
}
-} // unnamed namespace
+} // namespace
-SessionCatalogMigrationSource::SessionCatalogMigrationSource(NamespaceString ns)
- : _ns(std::move(ns)) {}
+SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx,
+ NamespaceString ns)
+ : _ns(std::move(ns)),
+ _rollbackIdAtInit(
+ uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx))) {
+ // Sort is not needed for correctness. This is just for making it easier to write deterministic
+ // tests.
+ Query query;
+ query.sort(BSON("_id" << 1));
+
+ DBDirectClient client(opCtx);
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query);
+
+ while (cursor->more()) {
+ auto nextSession = SessionTxnRecord::parse(
+ IDLParserErrorContext("Session migration cloning"), cursor->next());
+ if (!nextSession.getLastWriteOpTime().isNull()) {
+ _sessionOplogIterators.push_back(
+ stdx::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit));
+ }
+ }
+
+ {
+ AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
+ writeConflictRetry(
+ opCtx,
+ "session migration initialization majority commit barrier",
+ NamespaceString::kRsOplogNamespace.ns(),
+ [&] {
+ const auto message = BSON("sessionMigrateCloneStart" << _ns.ns());
+
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ opCtx, _ns, {}, {}, message);
+ wuow.commit();
+ });
+ }
+
+ auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ WriteConcernResult result;
+ WriteConcernOptions majority(
+ WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
+ uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result));
+}
bool SessionCatalogMigrationSource::hasMoreOplog() {
return _hasMoreOplogFromSessionCatalog() || _hasNewWrites();
@@ -188,7 +230,6 @@ repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionC
bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_sessionCloneMutex);
- invariant(_alreadyInitialized);
if (!_lastFetchedOplogBuffer.empty()) {
_lastFetchedOplog = _lastFetchedOplogBuffer.back();
_lastFetchedOplogBuffer.pop_back();
@@ -230,7 +271,6 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
{
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
- invariant(_alreadyInitialized);
if (_newWriteOpTimeList.empty()) {
_lastFetchedNewWriteOplog.reset();
return false;
@@ -262,54 +302,6 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
_newWriteOpTimeList.push_back(opTime);
}
-void SessionCatalogMigrationSource::init(OperationContext* opCtx) {
- invariant(!_alreadyInitialized);
-
- _rollbackIdAtInit = uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx));
-
- DBDirectClient client(opCtx);
- Query query;
- // Sort is not needed for correctness. This is just for making it easier to write deterministic
- // tests.
- query.sort(BSON("_id" << 1));
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query);
-
- std::vector<std::unique_ptr<SessionOplogIterator>> sessionOplogIterators;
- while (cursor->more()) {
- auto nextSession = SessionTxnRecord::parse(
- IDLParserErrorContext("Session migration cloning"), cursor->next());
- if (!nextSession.getLastWriteOpTime().isNull()) {
- sessionOplogIterators.push_back(
- stdx::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit));
- }
- }
-
- {
- auto message = BSON("sessionMigrateCloneStart" << _ns.ns());
- AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
- writeConflictRetry(
- opCtx,
- "session migration initialization majority commit barrier",
- NamespaceString::kRsOplogNamespace.ns(),
- [&] {
- WriteUnitOfWork wuow(opCtx);
- opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
- opCtx, _ns, {}, {}, message);
- wuow.commit();
- });
- }
-
- auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- WriteConcernResult result;
- WriteConcernOptions majority(
- WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
- uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result));
-
- stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex);
- _alreadyInitialized = true;
- _sessionOplogIterators.swap(sessionOplogIterators);
-}
-
SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(
SessionTxnRecord txnRecord, int expectedRollbackId)
: _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) {
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 2e5faf393c2..75d7a0073f1 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -83,9 +83,7 @@ public:
bool shouldWaitForMajority = false;
};
- explicit SessionCatalogMigrationSource(NamespaceString ns);
-
- void init(OperationContext* opCtx);
+ SessionCatalogMigrationSource(OperationContext* opCtx, NamespaceString ns);
/**
* Returns true if there are more oplog entries to fetch at this moment. Note that new writes
@@ -190,14 +188,16 @@ private:
*/
repl::OplogEntry _getLastFetchedNewWriteOplog();
+ // Namespace for which the migration is happening
const NamespaceString _ns;
- // Protects _alreadyInitialized, _sessionCatalogCursor, _sessionOplogIterators
- // _currentOplogIterator, _lastFetchedOplogBuffer, _lastFetchedOplog
- stdx::mutex _sessionCloneMutex;
- bool _alreadyInitialized = false;
+ // The rollback id just before migration started. This value is needed so that step-down
+ // followed by step-up situations can be discovered.
+ const int _rollbackIdAtInit;
- int _rollbackIdAtInit = 0;
+ // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator,
+ // _lastFetchedOplogBuffer, _lastFetchedOplog
+ stdx::mutex _sessionCloneMutex;
// List of remaining session records that needs to be cloned.
std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators;
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 4b8c7ad3bdf..74f87bbba25 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -101,8 +101,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
}
TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog) {
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
@@ -137,8 +136,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -223,8 +221,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
insertOplogEntry(entry1b);
insertOplogEntry(entry2b);
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto checkNextBatch = [this, &migrationSource](const repl::OplogEntry& firstExpectedOplog,
@@ -320,8 +317,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
auto expectedSequece = {entry3, entry4, entry1, entry2};
@@ -382,8 +378,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
sessionRecord2.toBSON());
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_TRUE(migrationSource.hasMoreOplog());
@@ -438,8 +433,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry3);
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(entry2.getOpTime());
@@ -474,8 +468,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
}
TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
migrationSource.notifyNewWriteOpTime(repl::OpTime(Timestamp(100, 3), 1));
@@ -484,8 +477,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertIfOplogCannotBeFound) {
}
TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBufferWasDepleted) {
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -581,8 +573,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{
@@ -638,8 +629,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
DBDirectClient client(opCtx());
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
- SessionCatalogMigrationSource migrationSource(kNs);
- migrationSource.init(opCtx());
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
{