summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-09-22 11:27:24 -0400
committerRandolph Tan <randolph@10gen.com>2017-09-26 17:40:27 -0400
commit457ecaf9ca73456df43e442ddd758b9067a6a002 (patch)
treefcf3351cd12133a77d158848703908c59c56377a /src/mongo/db/s
parentdb986c959a8e080d038577ae107af60bf2611557 (diff)
downloadmongo-457ecaf9ca73456df43e442ddd758b9067a6a002.tar.gz
SERVER-31233 Make session_catalog_migration_destination_test do an actual insert
instead of calling onWriteOpCompletedOnPrimary directly, as there is race where the migration thread might try to retrieve the oplog with the dummy timestamp.
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp117
1 files changed, 80 insertions, 37 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index c86cd709701..a1a76dfbd1d 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -32,8 +32,10 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_gen.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_destination.h"
@@ -47,6 +49,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_mongod_test_fixture.h"
#include "mongo/stdx/memory.h"
+#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -80,6 +83,8 @@ repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) {
class SessionCatalogMigrationDestinationTest : public ShardingMongodTestFixture {
public:
void setUp() override {
+ serverGlobalParams.featureCompatibility.version.store(
+ ServerGlobalParams::FeatureCompatibility::Version::k36);
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
ShardingMongodTestFixture::setUp();
@@ -101,6 +106,7 @@ public:
SessionCatalog::create(getServiceContext());
SessionCatalog::get(getServiceContext())->onStepUp(operationContext());
+ LogicalSessionCache::set(getServiceContext(), stdx::make_unique<LogicalSessionCacheNoop>());
}
void tearDown() override {
@@ -167,6 +173,37 @@ public:
}
}
+ void insertDocWithSessionInfo(const OperationSessionInfo& sessionInfo,
+ const NamespaceString& ns,
+ const BSONObj& doc,
+ StmtId stmtId) {
+ // Do write on a separate thread in order not to pollute this thread's opCtx.
+ stdx::thread insertThread([sessionInfo, ns, doc, stmtId] {
+ write_ops::WriteCommandBase cmdBase;
+ std::vector<StmtId> stmtIds;
+ stmtIds.push_back(stmtId);
+ cmdBase.setStmtIds(stmtIds);
+
+ write_ops::Insert insertRequest(ns);
+ std::vector<BSONObj> documents;
+ documents.push_back(doc);
+ insertRequest.setDocuments(documents);
+ insertRequest.setWriteCommandBase(cmdBase);
+
+ BSONObjBuilder insertBuilder;
+ insertRequest.serialize({}, &insertBuilder);
+ sessionInfo.serialize(&insertBuilder);
+
+ Client::initThread("test insert thread");
+ auto innerOpCtx = Client::getCurrent()->makeOperationContext();
+ DBDirectClient client(innerOpCtx.get());
+ BSONObj result;
+ ASSERT_TRUE(client.runCommand(ns.db().toString(), insertBuilder.obj(), result));
+ });
+
+ insertThread.join();
+ }
+
private:
std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
std::unique_ptr<DistLockManager> distLockManager) override {
@@ -764,16 +801,15 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
auto opCtx = operationContext();
- {
- // Create a new session entry.
- auto session = getSessionWithTxn(opCtx, sessionId, 20);
- session->beginTxn(opCtx, 20);
-
- Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
- WriteUnitOfWork wunit(opCtx);
- session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3));
- wunit.commit();
- }
+ OperationSessionInfo newSessionInfo;
+ newSessionInfo.setSessionId(sessionId);
+ newSessionInfo.setTxnNumber(20);
+
+ insertDocWithSessionInfo(newSessionInfo,
+ kNs,
+ BSON("_id"
+ << "newerSess"),
+ 0);
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
@@ -800,13 +836,15 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 20);
- ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ auto oplog = historyIter.next(opCtx);
+ ASSERT_BSONOBJ_EQ(BSON("_id"
+ << "newerSess"),
+ oplog.getObject());
- DBDirectClient client(opCtx);
- auto oplogBSON =
- client.findOne(NamespaceString::kRsOplogNamespace.ns(),
- BSON(repl::OplogEntryBase::kNamespaceFieldName << kNs.toString()));
- ASSERT_TRUE(oplogBSON.isEmpty());
+ ASSERT_FALSE(historyIter.hasNext());
}
TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) {
@@ -830,16 +868,15 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
returnOplog({oplog1});
- {
- // Create a new session entry.
- auto session = getSessionWithTxn(opCtx, sessionId, 20);
- session->beginTxn(opCtx, 20);
-
- Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
- WriteUnitOfWork wunit(opCtx);
- session->onWriteOpCompletedOnPrimary(opCtx, 20, {0}, Timestamp(100, 3));
- wunit.commit();
- }
+ OperationSessionInfo newSessionInfo;
+ newSessionInfo.setSessionId(sessionId);
+ newSessionInfo.setTxnNumber(20);
+
+ insertDocWithSessionInfo(newSessionInfo,
+ kNs,
+ BSON("_id"
+ << "newerSess"),
+ 0);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(oldSessionInfo);
@@ -853,7 +890,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 20);
- ASSERT_EQ(Timestamp(100, 3), session->getLastWriteOpTimeTs(20));
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(20));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ auto oplog = historyIter.next(opCtx);
+ ASSERT_BSONOBJ_EQ(BSON("_id"
+ << "newerSess"),
+ oplog.getObject());
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
@@ -994,16 +1037,11 @@ TEST_F(SessionCatalogMigrationDestinationTest,
returnOplog({oplog1});
- {
- // Create a new session entry.
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
- session->beginTxn(opCtx, 2);
-
- Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
- WriteUnitOfWork wunit(opCtx);
- session->onWriteOpCompletedOnPrimary(opCtx, 2, {0}, Timestamp(100, 3));
- wunit.commit();
- }
+ insertDocWithSessionInfo(sessionInfo,
+ kNs,
+ BSON("_id"
+ << "newerSess"),
+ 0);
OplogEntry oplog2(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
oplog2.setOperationSessionInfo(sessionInfo);
@@ -1022,6 +1060,11 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
+ auto oplog = historyIter.next(opCtx);
+ ASSERT_BSONOBJ_EQ(BSON("_id"
+ << "newerSess"),
+ oplog.getObject());
+
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));