diff options
author | Randolph Tan <randolph@10gen.com> | 2017-09-22 11:27:24 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-09-26 17:40:27 -0400 |
commit | 457ecaf9ca73456df43e442ddd758b9067a6a002 (patch) | |
tree | fcf3351cd12133a77d158848703908c59c56377a /src/mongo/db/s | |
parent | db986c959a8e080d038577ae107af60bf2611557 (diff) | |
download | mongo-457ecaf9ca73456df43e442ddd758b9067a6a002.tar.gz |
SERVER-31233 Make session_catalog_migration_destination_test do an actual insert
instead of calling onWriteOpCompletedOnPrimary directly, as there is race where the migration thread might try to retrieve the oplog with the dummy timestamp.
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 117 |
1 files changed, 80 insertions, 37 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index c86cd709701..a1a76dfbd1d 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -32,8 +32,10 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_gen.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_destination.h" @@ -47,6 +49,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_mongod_test_fixture.h" #include "mongo/stdx/memory.h" +#include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -80,6 +83,8 @@ repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) { class SessionCatalogMigrationDestinationTest : public ShardingMongodTestFixture { public: void setUp() override { + serverGlobalParams.featureCompatibility.version.store( + ServerGlobalParams::FeatureCompatibility::Version::k36); serverGlobalParams.clusterRole = ClusterRole::ShardServer; ShardingMongodTestFixture::setUp(); @@ -101,6 +106,7 @@ public: SessionCatalog::create(getServiceContext()); SessionCatalog::get(getServiceContext())->onStepUp(operationContext()); + LogicalSessionCache::set(getServiceContext(), stdx::make_unique<LogicalSessionCacheNoop>()); } void tearDown() override { @@ -167,6 +173,37 @@ public: } } + void insertDocWithSessionInfo(const OperationSessionInfo& sessionInfo, + const NamespaceString& ns, + const BSONObj& doc, + StmtId stmtId) { + // Do write on a separate thread in order not to pollute this thread's opCtx. + stdx::thread insertThread([sessionInfo, ns, doc, stmtId] { + write_ops::WriteCommandBase cmdBase; + std::vector<StmtId> stmtIds; + stmtIds.push_back(stmtId); + cmdBase.setStmtIds(stmtIds); + + write_ops::Insert insertRequest(ns); + std::vector<BSONObj> documents; + documents.push_back(doc); + insertRequest.setDocuments(documents); + insertRequest.setWriteCommandBase(cmdBase); + + BSONObjBuilder insertBuilder; + insertRequest.serialize({}, &insertBuilder); + sessionInfo.serialize(&insertBuilder); + + Client::initThread("test insert thread"); + auto innerOpCtx = Client::getCurrent()->makeOperationContext(); + DBDirectClient client(innerOpCtx.get()); + BSONObj result; + ASSERT_TRUE(client.runCommand(ns.db().toString(), insertBuilder.obj(), result)); + }); + + insertThread.join(); + } + private: std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( std::unique_ptr<DistLockManager> distLockManager) override { @@ -764,16 +801,15 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { auto opCtx = operationContext(); - { - // Create a new session entry. - auto session = getSessionWithTxn(opCtx, sessionId, 20); - session->beginTxn(opCtx, 20); - - Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); - WriteUnitOfWork wunit(opCtx); - session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3)); - wunit.commit(); - } + OperationSessionInfo newSessionInfo; + newSessionInfo.setSessionId(sessionId); + newSessionInfo.setTxnNumber(20); + + insertDocWithSessionInfo(newSessionInfo, + kNs, + BSON("_id" + << "newerSess"), + 0); SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); sessionMigration.start(getServiceContext()); @@ -800,13 +836,15 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 20); - ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20)); + + ASSERT_TRUE(historyIter.hasNext()); + auto oplog = historyIter.next(opCtx); + ASSERT_BSONOBJ_EQ(BSON("_id" + << "newerSess"), + oplog.getObject()); - DBDirectClient client(opCtx); - auto oplogBSON = - client.findOne(NamespaceString::kRsOplogNamespace.ns(), - BSON(repl::OplogEntryBase::kNamespaceFieldName << kNs.toString())); - ASSERT_TRUE(oplogBSON.isEmpty()); + ASSERT_FALSE(historyIter.hasNext()); } TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) { @@ -830,16 +868,15 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt returnOplog({oplog1}); - { - // Create a new session entry. - auto session = getSessionWithTxn(opCtx, sessionId, 20); - session->beginTxn(opCtx, 20); - - Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); - WriteUnitOfWork wunit(opCtx); - session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3)); - wunit.commit(); - } + OperationSessionInfo newSessionInfo; + newSessionInfo.setSessionId(sessionId); + newSessionInfo.setTxnNumber(20); + + insertDocWithSessionInfo(newSessionInfo, + kNs, + BSON("_id" + << "newerSess"), + 0); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(oldSessionInfo); @@ -853,7 +890,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 20); - ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20)); + TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20)); + + ASSERT_TRUE(historyIter.hasNext()); + auto oplog = historyIter.next(opCtx); + ASSERT_BSONOBJ_EQ(BSON("_id" + << "newerSess"), + oplog.getObject()); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) { @@ -994,16 +1037,11 @@ TEST_F(SessionCatalogMigrationDestinationTest, returnOplog({oplog1}); - { - // Create a new session entry. - auto session = getSessionWithTxn(opCtx, sessionId, 2); - session->beginTxn(opCtx, 2); - - Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX); - WriteUnitOfWork wunit(opCtx); - session->onWriteOpCompletedOnPrimary(opCtx, 2, {0}, Timestamp(100, 3)); - wunit.commit(); - } + insertDocWithSessionInfo(sessionInfo, + kNs, + BSON("_id" + << "newerSess"), + 0); OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80)); oplog2.setOperationSessionInfo(sessionInfo); @@ -1022,6 +1060,11 @@ TEST_F(SessionCatalogMigrationDestinationTest, ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); + auto oplog = historyIter.next(opCtx); + ASSERT_BSONOBJ_EQ(BSON("_id" + << "newerSess"), + oplog.getObject()); + ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); |