summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-12-01 02:39:29 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-12-07 12:54:22 -0500
commit6f06d901a8d6602b466a4dc18e8ecf2f6e8f1d5b (patch)
tree0c136de4c437de925f59eefc6751168784b8c7bc
parent00bf03c454e7796d507ad2e1ee80c5d75e270879 (diff)
downloadmongo-6f06d901a8d6602b466a4dc18e8ecf2f6e8f1d5b.tar.gz
SERVER-38371 Do not skip subsequent session records if one session's migration fails
(cherry picked from commit f036c87b5f4c9d6a82338aa8680b5c6f2b541026)
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h15
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp165
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp157
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp5
4 files changed, 212 insertions, 130 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 0cacc08e41c..b257bd6bc24 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -146,12 +146,15 @@ public:
Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder);
/**
- * Appends to the buffer oplogs that contain session information for this migration.
- * If this function returns a valid OpTime, this means that the oplog appended are
- * not guaranteed to be majority committed and the caller has to use wait for the
- * returned opTime to be majority committed. If the underlying SessionCatalogMigrationSource
- * does not exist, that means this node is running as a standalone and doesn't support retryable
- * writes, so we return boost::none.
+ * Appends to 'arrBuilder' oplog entries which wrote to the currently migrated chunk and contain
+ * session information.
+ *
+ * If this function returns a valid OpTime, this means that the oplog appended are not
+ * guaranteed to be majority committed and the caller has to wait for the returned opTime to be
+ * majority committed before returning them to the donor shard.
+ *
+ * If the underlying SessionCatalogMigrationSource does not exist, that means this node is
+ * running as a standalone and doesn't support retryable writes, so we return boost::none.
*
* This waiting is necessary because session migration is only allowed to send out committed
* entries, as opposed to chunk migration, which can send out uncommitted documents. With chunk
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 3b9969bebfb..5353a4e6d5d 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -59,10 +59,11 @@ const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority,
Milliseconds(0));
struct ProcessOplogResult {
- bool isPrePostImage = false;
- repl::OpTime oplogTime;
LogicalSessionId sessionId;
TxnNumber txnNum;
+
+ repl::OpTime oplogTime;
+ bool isPrePostImage = false;
};
/**
@@ -140,12 +141,13 @@ repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult,
* Parses the oplog into an oplog entry and makes sure that it contains the expected fields.
*/
repl::OplogEntry parseOplog(const BSONObj& oplogBSON) {
- auto oplogStatus = repl::OplogEntry::parse(oplogBSON);
- uassertStatusOK(oplogStatus.getStatus());
+ auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON));
- auto oplogEntry = oplogStatus.getValue();
+ // Session oplog entries must always contain wall clock time, because we will not be
+ // transferring anything from a previous version of the server
+ invariant(oplogEntry.getWallClockTime());
- auto sessionInfo = oplogEntry.getOperationSessionInfo();
+ const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
uassert(ErrorCodes::UnsupportedFormat,
str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString()
@@ -204,8 +206,12 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const BSONObj& oplogBSON,
const ProcessOplogResult& lastResult) {
- ProcessOplogResult result;
auto oplogEntry = parseOplog(oplogBSON);
+ const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
+
+ ProcessOplogResult result;
+ result.sessionId = *sessionInfo.getSessionId();
+ result.txnNum = *sessionInfo.getTxnNumber();
BSONObj object2;
if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) {
@@ -231,22 +237,15 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
<< ", oplog ts: "
<< oplogEntry.getTimestamp().toString()
<< ": "
- << redact(oplogBSON),
+ << oplogBSON,
!lastResult.isPrePostImage);
}
} else {
object2 = oplogBSON; // TODO: strip redundant info?
}
- const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
- result.sessionId = sessionInfo.getSessionId().value();
- result.txnNum = sessionInfo.getTxnNumber().value();
const auto stmtId = *oplogEntry.getStatementId();
- // Session oplog entries must always contain wall clock time, because we will not be
- // transferring anything from a previous version of the server
- invariant(oplogEntry.getWallClockTime());
-
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
@@ -255,6 +254,11 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
return lastResult;
}
} catch (const DBException& ex) {
+ // If the transaction chain was truncated on the recipient shard, then we are most likely
+ // copying from a session that hasn't been touched on the recipient shard for a very long
+ // time but could be recent on the donor.
+ //
+ // We continue copying regardless to get the entire transaction from the donor.
if (ex.code() != ErrorCodes::IncompleteTransactionHistory) {
throw;
}
@@ -297,7 +301,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
oplogLink,
OplogSlot());
- auto oplogOpTime = result.oplogTime;
+ const auto& oplogOpTime = result.oplogTime;
uassert(40633,
str::stream() << "Failed to create new oplog entry for oplog with opTime: "
<< oplogEntry.getOpTime().toString()
@@ -341,8 +345,21 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
_isStateChanged.notify_all();
}
- _thread = stdx::thread(stdx::bind(
- &SessionCatalogMigrationDestination::_retrieveSessionStateFromSource, this, service));
+ _thread = stdx::thread([=] {
+ try {
+ _retrieveSessionStateFromSource(service);
+ } catch (const DBException& ex) {
+ if (ex.code() == ErrorCodes::CommandNotFound) {
+ // TODO: remove this after v3.7
+ //
+ // This means that the donor shard is running at an older version so it is safe to
+ // just end this because there is no session information to transfer.
+ return;
+ }
+
+ _errorOccurred(ex.toString());
+ }
+ });
}
void SessionCatalogMigrationDestination::finish() {
@@ -390,85 +407,67 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- try {
- auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
- BSONArray oplogArray(nextBatch[kOplogField].Obj());
- BSONArrayIteratorSorted oplogIter(oplogArray);
-
- if (!oplogIter.more()) {
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_state == State::Committing) {
- // The migration is considered done only when it gets an empty result from
- // the source shard while this is in state committing. This is to make sure
- // that it doesn't miss any new oplog created between the time window where
- // this depleted the buffer from the source shard and receiving the commit
- // command.
- if (oplogDrainedAfterCommiting) {
- break;
- }
-
- oplogDrainedAfterCommiting = true;
+ auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
+ BSONArray oplogArray(nextBatch[kOplogField].Obj());
+ BSONArrayIteratorSorted oplogIter(oplogArray);
+
+ if (!oplogIter.more()) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state == State::Committing) {
+ // The migration is considered done only when it gets an empty result from the
+ // source shard while this is in state committing. This is to make sure that it
+ // doesn't miss any new oplog created between the time window where this
+ // depleted the buffer from the source shard and receiving the commit command.
+ if (oplogDrainedAfterCommiting) {
+ break;
}
- }
-
- WriteConcernResult wcResult;
- auto wcStatus =
- waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult);
- if (!wcStatus.isOK()) {
- _errorOccurred(wcStatus.toString());
- return;
- }
- // We depleted the buffer at least once, transition to ready for commit.
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Note: only transition to "ready to commit" if state is not error/force stop.
- if (_state == State::Migrating) {
- _state = State::ReadyToCommit;
- _isStateChanged.notify_all();
- }
+ oplogDrainedAfterCommiting = true;
}
+ }
- if (lastOpTimeWaited == lastResult.oplogTime) {
- // We got an empty result at least twice in a row from the source shard so
- // space it out a little bit so we don't hammer the shard.
- opCtx->sleepFor(Milliseconds(200));
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(
+ waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
+
+ // We depleted the buffer at least once, transition to ready for commit.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Note: only transition to "ready to commit" if state is not error/force stop.
+ if (_state == State::Migrating) {
+ _state = State::ReadyToCommit;
+ _isStateChanged.notify_all();
}
+ }
- lastOpTimeWaited = lastResult.oplogTime;
+ if (lastOpTimeWaited == lastResult.oplogTime) {
+ // We got an empty result at least twice in a row from the source shard so space it
+ // out a little bit so we don't hammer the shard
+ opCtx->sleepFor(Milliseconds(200));
}
- while (oplogIter.more()) {
+ lastOpTimeWaited = lastResult.oplogTime;
+ }
+
+ while (oplogIter.more()) {
+ try {
lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult);
- }
- } catch (const DBException& excep) {
- if (excep.code() == ErrorCodes::ConflictingOperationInProgress ||
- excep.code() == ErrorCodes::TransactionTooOld) {
- // This means that the server has a newer txnNumber than the oplog being migrated,
- // so just skip it.
- continue;
- }
+ } catch (const DBException& ex) {
+ if (ex.code() == ErrorCodes::ConflictingOperationInProgress ||
+ ex.code() == ErrorCodes::TransactionTooOld) {
+ // This means that the server has a newer txnNumber than the oplog being
+ // migrated, so just skip it
+ continue;
+ }
- if (excep.code() == ErrorCodes::CommandNotFound) {
- // TODO: remove this after v3.7
- //
- // This means that the donor shard is running at an older version so it is safe to
- // just end this because there is no session information to transfer.
- break;
+ throw;
}
-
- _errorOccurred(excep.toString());
- return;
}
}
- WriteConcernResult wcResult;
- auto wcStatus = waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult);
- if (!wcStatus.isOK()) {
- _errorOccurred(wcStatus.toString());
- return;
- }
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult));
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index fe71b57e88c..4614ab30602 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -220,7 +220,7 @@ public:
Session* session,
TxnNumber txnNumber,
StmtId stmtId,
- repl::OplogEntry& expectedOplog) {
+ const repl::OplogEntry& expectedOplog) {
auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
ASSERT_TRUE(oplog);
checkOplogWithNestedOplog(expectedOplog, *oplog);
@@ -1510,41 +1510,38 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) {
- const auto sessionId = makeLogicalSessionIdForTest();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(makeLogicalSessionIdForTest());
+ sessionInfo.setTxnNumber(2);
+ const std::vector<repl::OplogEntry> oplogEntries{
+ makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime
+ OpTypeEnum::kInsert, // op type
+ BSON("x" << 100), // o
+ boost::none, // o2
+ sessionInfo, // session info
+ Date_t::now(), // wall clock time
+ 23), // statement id
+ makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
+ OpTypeEnum::kNoop, // op type
+ {}, // o
+ Session::kDeadEndSentinel, // o2
+ sessionInfo, // session info
+ Date_t::now(), // wall clock time
+ kIncompleteHistoryStmtId), // statement id
+ makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
+ OpTypeEnum::kInsert, // op type
+ BSON("x" << 60), // o
+ boost::none, // o2
+ sessionInfo, // session info
+ Date_t::now(), // wall clock time
+ 5)}; // statement id
SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
sessionMigration.start(getServiceContext());
sessionMigration.finish();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(2);
-
- auto oplog1 = makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime
- OpTypeEnum::kInsert, // op type
- BSON("x" << 100), // o
- boost::none, // o2
- sessionInfo, // session info
- Date_t::now(), // wall clock time
- 23); // statement id
-
- auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
- OpTypeEnum::kNoop, // op type
- {}, // o
- Session::kDeadEndSentinel, // o2
- sessionInfo, // session info
- Date_t::now(), // wall clock time
- kIncompleteHistoryStmtId); // statement id
+ returnOplog(oplogEntries);
- auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
- OpTypeEnum::kInsert, // op type
- BSON("x" << 60), // o
- boost::none, // o2
- sessionInfo, // session info
- Date_t::now(), // wall clock time
- 5); // statement id
-
- returnOplog({oplog1, oplog2, oplog3});
// migration always fetches at least twice to transition from committing to done.
returnOplog({});
returnOplog({});
@@ -1553,26 +1550,110 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
- auto opCtx = operationContext();
- auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ const auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2);
TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+ checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
- checkOplog(oplog2, historyIter.next(opCtx));
+ checkOplog(oplogEntries[1], historyIter.next(opCtx));
ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+ checkOplogWithNestedOplog(oplogEntries[0], historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
- checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
- checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
+ checkStatementExecuted(opCtx, session.get(), 2, 23, oplogEntries[0]);
+ checkStatementExecuted(opCtx, session.get(), 2, 5, oplogEntries[2]);
ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException);
}
-} // namespace
+TEST_F(SessionCatalogMigrationDestinationTest,
+ OplogEntriesWithOldTransactionFollowedByUpToDateEntries) {
+ auto opCtx = operationContext();
+
+ OperationSessionInfo sessionInfo1;
+ sessionInfo1.setSessionId(makeLogicalSessionIdForTest());
+ sessionInfo1.setTxnNumber(2);
+ {
+ // "Start" a new transaction on session 1, so that migrating the entries above will result
+ // in TransactionTooOld. This should not preclude the entries for session 2 from getting
+ // applied.
+ auto scopedSession =
+ SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, *sessionInfo1.getSessionId());
+ scopedSession->refreshFromStorageIfNeeded(opCtx);
+ scopedSession->beginTxn(opCtx, 3);
+ }
+
+ OperationSessionInfo sessionInfo2;
+ sessionInfo2.setSessionId(makeLogicalSessionIdForTest());
+ sessionInfo2.setTxnNumber(15);
+
+ const std::vector<repl::OplogEntry> oplogEntries{
+ // Session 1 entries
+ makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime
+ OpTypeEnum::kInsert, // op type
+ BSON("x" << 100), // o
+ boost::none, // o2
+ sessionInfo1, // session info
+ Date_t::now(), // wall clock time
+ 23), // statement id
+
+ // Session 2 entries
+ makeOplogEntry(OpTime(Timestamp(50, 2), 1), // optime
+ OpTypeEnum::kInsert, // op type
+ BSON("x" << 50), // o
+ boost::none, // o2
+ sessionInfo2, // session info
+ Date_t::now(), // wall clock time
+ 56), // statement id
+ makeOplogEntry(OpTime(Timestamp(20, 2), 1), // optime
+ OpTypeEnum::kInsert, // op type
+ BSON("x" << 20), // o
+ boost::none, // o2
+ sessionInfo2, // session info
+ Date_t::now(), // wall clock time
+ 55)}; // statement id
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ returnOplog(oplogEntries);
+
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ // Check nothing was written for session 1
+ {
+ auto session1 = getSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3);
+ ASSERT(session1->getLastWriteOpTime(3).isNull());
+ }
+
+ // Check session 2 was correctly updated
+ {
+ auto session2 = getSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15);
+
+ TransactionHistoryIterator historyIter(session2->getLastWriteOpTime(15));
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplogEntries[1], historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+ checkStatementExecuted(opCtx, session2.get(), 15, 56, oplogEntries[1]);
+ checkStatementExecuted(opCtx, session2.get(), 15, 55, oplogEntries[2]);
+ }
+}
+
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 5b1e76a1e18..5df4116685b 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -41,15 +41,14 @@
#include "mongo/unittest/unittest.h"
namespace mongo {
+namespace {
using executor::RemoteCommandRequest;
-namespace {
+const NamespaceString kNs("a.b");
class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {};
-const NamespaceString kNs("a.b");
-
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
*/