summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-09-19 14:44:36 -0400
committerRandolph Tan <randolph@10gen.com>2017-09-26 19:11:04 -0400
commit85d9721c00d7020af78fe60453f8362380fe697d (patch)
tree197240934bc6e849964449587234b526916e512d /src
parent457ecaf9ca73456df43e442ddd758b9067a6a002 (diff)
downloadmongo-85d9721c00d7020af78fe60453f8362380fe697d.tar.gz
SERVER-30894 Implement command for transferring session information during migration
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/op_observer_impl.cpp95
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp10
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp17
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/repl/oplog.h3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp19
-rw-r--r--src/mongo/db/s/collection_sharding_state.h12
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp10
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h15
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp75
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h19
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp52
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp16
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp23
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp107
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.h13
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp53
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp9
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h1
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp2
22 files changed, 420 insertions, 139 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 80a17b9ccf1..24825fb09ba 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -225,6 +225,7 @@ error_code("JSONSchemaNotAllowed", 224)
error_code("TransactionTooOld", 225)
error_code("AtomicityFailure", 226)
error_code("CannotImplicitlyCreateCollection", 227);
+error_code("SessionTransferIncomplete", 228)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 9a6bd8e5be7..b466b70c3bb 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -115,10 +115,15 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd,
return cmdObjBuilder.obj();
}
+struct OpTimeBundle {
+ repl::OpTime writeOpTime;
+ repl::OpTime prePostImageOpTime;
+};
+
/**
* Write oplog entry(ies) for the update operation.
*/
-repl::OpTime replLogUpdate(OperationContext* opCtx,
+OpTimeBundle replLogUpdate(OperationContext* opCtx,
Session* session,
const OplogUpdateEntryArgs& args) {
BSONObj storeObj;
@@ -138,6 +143,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ OpTimeBundle opTimes;
+
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
auto noteUpdateOpTime = repl::logOp(opCtx,
"n",
@@ -150,6 +157,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
args.stmtId,
{});
+ opTimes.prePostImageOpTime = noteUpdateOpTime;
+
if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
oplogLink.preImageTs = noteUpdateOpTime.getTimestamp();
} else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
@@ -157,22 +166,24 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
}
}
- return repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- sessionInfo,
- args.stmtId,
- oplogLink);
+ opTimes.writeOpTime = repl::logOp(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
+
+ return opTimes;
}
/**
* Write oplog entry(ies) for the delete operation.
*/
-repl::OpTime replLogDelete(OperationContext* opCtx,
+OpTimeBundle replLogDelete(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
Session* session,
@@ -189,22 +200,26 @@ repl::OpTime replLogDelete(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ OpTimeBundle opTimes;
+
if (deletedDoc && opCtx->getTxnNumber()) {
auto noteOplog = repl::logOp(
opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {});
+ opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageTs = noteOplog.getTimestamp();
}
- return repl::logOp(opCtx,
- "d",
- nss,
- uuid,
- deleteState.documentKey,
- nullptr,
- fromMigrate,
- sessionInfo,
- stmtId,
- oplogLink);
+ opTimes.writeOpTime = repl::logOp(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ return opTimes;
}
} // namespace
@@ -253,7 +268,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, systemIndexes);
if (!fromMigrate) {
- css->onInsertOp(opCtx, indexDoc);
+ css->onInsertOp(opCtx, indexDoc, {});
}
}
@@ -264,15 +279,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate);
+
+ const size_t count = end - begin;
+ auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ const auto lastOpTime =
+ repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
- for (auto it = begin; it != end; it++) {
+ size_t index = 0;
+ for (auto it = begin; it != end; it++, index++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (!fromMigrate) {
- css->onInsertOp(opCtx, it->doc);
+ css->onInsertOp(opCtx, it->doc, timestamps[index]);
}
}
@@ -312,7 +332,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
auto css = CollectionShardingState::get(opCtx, args.nss);
if (!args.fromMigrate) {
- css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc);
+ css->onUpdateOp(opCtx,
+ args.criteria,
+ args.update,
+ args.updatedDoc,
+ opTime.writeOpTime.getTimestamp(),
+ opTime.prePostImageOpTime.getTimestamp());
}
if (args.nss.coll() == "system.js") {
@@ -322,11 +347,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
} else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc);
} else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
- !opTime.isNull()) {
+ !opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
- onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime);
+
+ onWriteOpCompleted(
+ opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
}
auto OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -356,7 +383,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, nss.ns());
if (!fromMigrate) {
- css->onDeleteOp(opCtx, deleteState);
+ css->onDeleteOp(opCtx,
+ deleteState,
+ opTime.writeOpTime.getTimestamp(),
+ opTime.prePostImageOpTime.getTimestamp());
}
if (nss.coll() == "system.js") {
@@ -365,11 +395,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey);
- } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) {
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
- onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime);
+ onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 13d3a1cae56..3033daa68f9 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -90,16 +90,6 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
opType == repl::OpTypeEnum::kUpdate);
- uassert(
- 40610,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON())
- << " is not compatible with previous write in the transaction of type: "
- << OpType_serializer(oplogEntry.getOpType())
- << ", oplogTs: "
- << ts.toString()
- << ", oplog: "
- << redact(oplogEntry.toBSON()),
- !request.isUpsert());
if (request.shouldReturnNew()) {
uassert(40611,
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index f3415ae2427..a50c51eba58 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -273,23 +273,6 @@ TEST_F(FindAndModifyRetryability, NestedUpsert) {
ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue());
}
-TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) {
- auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
- request.setUpsert(true);
-
- Timestamp imageTs(120, 3);
- repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
-
- insertOplogEntry(noteOplog);
-
- repl::OplogEntry oplog(
- repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1));
- oplog.setPreImageTs(imageTs);
-
- ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException);
-}
-
TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setUpsert(false);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 6fa9c678fbd..f91ae11097b 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -452,6 +452,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
Session* session,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
+ Timestamp timestamps[],
bool fromMigrate) {
invariant(begin != end);
@@ -480,7 +481,6 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
- auto timestamps = stdx::make_unique<Timestamp[]>(count);
OpTime lastOpTime;
for (size_t i = 0; i < count; i++) {
// Make a mutable copy.
@@ -512,7 +512,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
basePtrs[i] = &writers[i];
}
invariant(!lastOpTime.isNull());
- _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime);
wuow.commit();
return lastOpTime;
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index a5eff8ac1aa..34a96b23b05 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -103,6 +103,8 @@ extern int OPLOG_VERSION;
/**
* Log insert(s) to the local oplog.
* Returns the OpTime of the last insert.
+ * The timestamps parameter can also be modified and contain the individual timestamps for each
+ * insert after the oplog entries were created.
*/
OpTime logInsertOps(OperationContext* opCtx,
const NamespaceString& nss,
@@ -110,6 +112,7 @@ OpTime logInsertOps(OperationContext* opCtx,
Session* session,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
+ Timestamp timestamps[],
bool fromMigrate);
/**
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 9e6477c1d4b..575c442ac74 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -267,7 +267,9 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co
return _metadataManager->getNextOrphanRange(from);
}
-void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) {
+void CollectionShardingState::onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -291,14 +293,16 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj&
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc);
+ _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs);
}
}
void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -315,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc);
+ _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs);
}
}
@@ -324,7 +328,10 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState
_sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)};
}
-void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) {
+void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
+ const DeleteState& deleteState,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -365,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteSt
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
- _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey);
+ _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs);
}
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 5aa44336f70..6f9722dceb9 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -49,6 +49,7 @@ struct ChunkVersion;
class CollectionMetadata;
class MigrationSourceManager;
class OperationContext;
+class Timestamp;
/**
* Contains all sharding-related runtime state for a given collection. One such object is assigned
@@ -223,12 +224,17 @@ public:
*
* The global exclusive lock is expected to be held by the caller of any of these functions.
*/
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc);
+ void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs);
void onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc);
- void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState);
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs);
+ void onDeleteOp(OperationContext* opCtx,
+ const DeleteState& deleteState,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs);
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
private:
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 844727e72c4..9ac9f4a9702 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
@@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
{
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
}
@@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
@@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument
ShardIdentityType shardIdentity;
shardIdentity.setShardName("a");
- ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()),
+ ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}),
AssertionException);
}
@@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot
NamespaceString::kServerConfigurationNamespace);
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), BSON("_id" << 1));
+ collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {});
ASSERT_EQ(0, getInitCallCount());
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 750bcdcda80..9d9a1407f1d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -36,6 +36,7 @@ namespace mongo {
class BSONObj;
class OperationContext;
class Status;
+class Timestamp;
/**
* This class is responsible for producing chunk documents to be moved from donor to a recipient
@@ -118,7 +119,9 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0;
+ virtual void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) = 0;
/**
* Notifies this cloner that an update happened to the collection, which it owns. It is up to
@@ -127,7 +130,10 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0;
+ virtual void onUpdateOp(OperationContext* opCtx,
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) = 0;
/**
* Notifies this cloner that a delede happened to the collection, which it owns. It is up to the
@@ -136,7 +142,10 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0;
+ virtual void onDeleteOp(OperationContext* opCtx,
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) = 0;
protected:
MigrationChunkClonerSource();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 70a76e5e473..65a75e4c9e0 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -135,8 +135,14 @@ public:
*/
LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
const BSONObj& idObj,
- const char op)
- : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {}
+ const char op,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs)
+ : _cloner(cloner),
+ _idObj(idObj.getOwned()),
+ _op(op),
+ _oplogTs(oplogTs),
+ _prePostImageTs(prePostImageTs) {}
void commit() override {
switch (_op) {
@@ -156,6 +162,14 @@ public:
default:
MONGO_UNREACHABLE;
}
+
+ if (!_prePostImageTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs);
+ }
+
+ if (!_oplogTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs);
+ }
}
void rollback() override {}
@@ -164,6 +178,8 @@ private:
MigrationChunkClonerSourceLegacy* const _cloner;
const BSONObj _idObj;
const char _op;
+ const Timestamp _oplogTs;
+ const Timestamp _prePostImageTs;
};
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
@@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)) {}
+ _recipientHost(std::move(recipientHost)),
+ _sessionCatalogSource(_args.getNss()) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
return storeCurrentLocsStatus;
}
+ // Prime up the session migration source if there are oplog entries to migrate.
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
@@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(opCtx);
+
+ if (_sessionCatalogSource.hasMoreOplog()) {
+ return {ErrorCodes::SessionTransferIncomplete,
+ "destination shard finished committing but there are still some session "
+ "metadata that needs to be transferred"};
+ }
return Status::OK();
}
@@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj&
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc) {
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
@@ -358,11 +385,14 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {}));
}
void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
@@ -376,11 +406,14 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs));
}
void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId) {
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
@@ -390,7 +423,8 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs));
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -679,4 +713,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
arr.done();
}
+void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
+ BSONArrayBuilder* arrBuilder) {
+ while (_sessionCatalogSource.hasMoreOplog()) {
+ auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog();
+
+ if (oplogDoc.isEmpty()) {
+ // Last fetched turned out empty, try to see if there are more
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ continue;
+ }
+
+ // Use the builder size instead of accumulating the document sizes directly so that we
+ // take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) {
+ break;
+ }
+
+ arrBuilder->append(oplogDoc);
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index c072718475a..72d299b58e4 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -37,6 +37,7 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/memory.h"
@@ -72,11 +73,19 @@ public:
bool isDocumentInMigratingChunk(const BSONObj& doc) override;
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override;
+ void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) override;
- void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override;
+ void onUpdateOp(OperationContext* opCtx,
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) override;
- void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override;
+ void onDeleteOp(OperationContext* opCtx,
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) override;
// Legacy cloner specific functionality
@@ -121,6 +130,8 @@ public:
*/
Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder);
+ void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder);
+
private:
friend class DeleteNotificationStage;
friend class LogOpForShardingHandler;
@@ -183,6 +194,8 @@ private:
// during the cloning stage
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec;
+ SessionCatalogMigrationSource _sessionCatalogSource;
+
// Protects the entries below
stdx::mutex _mutex;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 9e3e83774ee..d42c6bdaa99 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -216,5 +216,57 @@ public:
} transferModsCommand;
+/**
+ * Command for extracting the oplog entries that needs to be migrated for the given migration
+ * session id.
+ * Note: this command is not stateless. Calling this command has a side-effect of gradually
+ * depleting the buffer that contains the oplog entries to be transfered.
+ */
+class MigrateSessionCommand : public BasicCommand {
+public:
+ MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {}
+
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+
+ virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+
+ virtual bool slaveOk() const {
+ return false;
+ }
+
+ virtual bool adminOnly() const {
+ return true;
+ }
+
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ BSONArrayBuilder arrBuilder;
+
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder);
+
+ result.appendArray("oplog", arrBuilder.arr());
+ return true;
+ }
+
+} migrateSessionCommand;
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 193c237db5c..a15f6487a33 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
WriteUnitOfWork wuow(operationContext());
- cloner.onInsertOp(operationContext(), createCollectionDocument(90));
- cloner.onInsertOp(operationContext(), createCollectionDocument(150));
- cloner.onInsertOp(operationContext(), createCollectionDocument(151));
- cloner.onInsertOp(operationContext(), createCollectionDocument(210));
-
- cloner.onDeleteOp(operationContext(), createCollectionDocument(80));
- cloner.onDeleteOp(operationContext(), createCollectionDocument(199));
- cloner.onDeleteOp(operationContext(), createCollectionDocument(220));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(90), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(150), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(151), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(210), {});
+
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {});
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {});
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {});
wuow.commit();
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index ca2c736a6b1..86688ba08ca 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -57,6 +57,8 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/concurrency/notification.h"
@@ -231,6 +233,8 @@ void MigrationDestinationManager::setStateFail(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
+
+ _sessionMigration->forceFail(msg);
}
void MigrationDestinationManager::setStateFailWarn(std::string msg) {
@@ -240,6 +244,8 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
+
+ _sessionMigration->forceFail(msg);
}
bool MigrationDestinationManager::isActive() const {
@@ -334,6 +340,9 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_migrateThreadHandle.join();
}
+ _sessionMigration =
+ stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId);
+
_migrateThreadHandle =
stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
_migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
@@ -397,6 +406,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
<< _sessionId->toString()};
}
+ _sessionMigration->finish();
_state = COMMIT_START;
auto const deadline = Date_t::now() + Seconds(30);
@@ -411,6 +421,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
if (_state != DONE) {
return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"};
}
+
return Status::OK();
}
@@ -672,6 +683,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 3. Initial bulk clone
setState(CLONE);
+ _sessionMigration->start(opCtx->getServiceContext());
+
const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
@@ -829,13 +842,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
break;
}
- sleepsecs(1);
+ opCtx->sleepFor(Seconds(1));
}
if (t.minutes() >= 600) {
setStateFail("Cannot go to critical section because secondaries cannot keep up");
return;
}
+
+ _sessionMigration->waitUntilReadyToCommit(opCtx);
}
{
@@ -895,6 +910,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5);
}
+ _sessionMigration->join();
+ if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
+ setStateFail(_sessionMigration->getErrMsg());
+ return;
+ }
+
setState(DONE);
timing.done(6);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 610f70196c1..49c6f1e5089 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -39,6 +39,7 @@
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_destination.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
@@ -211,6 +212,8 @@ private:
State _state{READY};
std::string _errmsg;
+
+ std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration;
};
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index c9344031555..1c4c250dac1 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -201,7 +201,6 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
*/
ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const BSONObj& oplogBSON,
- // const Timestamp& prePostImageTs,
const ProcessOplogResult& lastResult) {
ProcessOplogResult result;
auto oplogEntry = parseOplog(oplogBSON);
@@ -240,50 +239,62 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
result.sessionId = sessionInfo.getSessionId().value();
result.txnNum = sessionInfo.getTxnNumber().value();
+ const auto stmtId = *oplogEntry.getStatementId();
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
+ if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
+ return lastResult;
+ }
+
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum);
- writeConflictRetry(opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- sessionInfo,
- *oplogEntry.getStatementId(),
- oplogLink);
-
- auto oplogTs = result.oplogTime.getTimestamp();
- uassert(40633,
- str::stream()
- << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogTs.isNull());
-
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(
- opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs);
- }
-
- wunit.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and trigger the
+ // invariant that disallows unlocking global lock while inside a WUOW.
+ // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush
+ // lock will be lock/unlocked correctly. Take the transaction table db lock to
+ // ensure the same lock ordering with normal replicated updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+
+ auto oplogTs = result.oplogTime.getTimestamp();
+ uassert(40633,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogTs.isNull());
+
+ // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because
+ // the next oplog will contain the real operation.
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs);
+ }
+
+ wunit.commit();
+ });
return result;
}
@@ -307,6 +318,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == State::NotStarted);
_state = State::Migrating;
+ _isStateChanged.notify_all();
}
_thread = stdx::thread(stdx::bind(
@@ -316,6 +328,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
void SessionCatalogMigrationDestination::finish() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Committing;
+ _isStateChanged.notify_all();
}
void SessionCatalogMigrationDestination::join() {
@@ -345,6 +358,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
// Timestamp prePostImageTs;
ProcessOplogResult lastResult;
+ repl::OpTime lastOpTimeWaited;
while (true) {
{
@@ -381,8 +395,17 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
// Note: only transition to "ready to commit" if state is not error/force stop.
if (_state == State::Migrating) {
_state = State::ReadyToCommit;
+ _isStateChanged.notify_all();
}
}
+
+ if (lastOpTimeWaited == lastResult.oplogTime) {
+ // We got an empty result at least twice in a row from the source shard so
+ // space it out a little bit so we don't hammer the shard.
+ opCtx->sleepFor(Milliseconds(200));
+ }
+
+ lastOpTimeWaited = lastResult.oplogTime;
}
while (oplogIter.more()) {
@@ -411,6 +434,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Done;
+ _isStateChanged.notify_all();
}
}
@@ -423,6 +447,8 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::ErrorOccurred;
_errMsg = errMsg.toString();
+
+ _isStateChanged.notify_all();
}
SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() {
@@ -430,4 +456,15 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge
return _state;
}
+void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) {
+ _errorOccurred(errMsg);
+}
+
+void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_state == State::Migrating) {
+ opCtx->waitForConditionOrInterrupt(_isStateChanged, lk);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h
index 2e740f2f6bb..a67bd8012f6 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.h
+++ b/src/mongo/db/s/session_catalog_migration_destination.h
@@ -37,6 +37,7 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/shard_id.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -88,6 +89,17 @@ public:
void join();
/**
+ * Forces this into an error state which will also stop session transfer thread.
+ */
+ void forceFail(std::string& errMsg);
+
+ /**
+ * Blocks until state changes is not Migrating. In other words, can return when state
+ * becomes ReadyToCommit/Done/ErrorOccurred, etc.
+ */
+ void waitUntilReadyToCommit(OperationContext* opCtx);
+
+ /**
* Returns the current state.
*/
State getState();
@@ -109,6 +121,7 @@ private:
// Protects _state and _errMsg.
stdx::mutex _mutex;
+ stdx::condition_variable _isStateChanged;
State _state = State::NotStarted;
std::string _errMsg; // valid only if _state == ErrorOccurred.
};
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index a1a76dfbd1d..1ec90ca88b1 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -1303,6 +1303,59 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ auto opCtx = operationContext();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(19);
+
+ insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30);
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(30);
+
+ OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(45);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto session = getSessionWithTxn(opCtx, sessionId, 19);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ auto firstInsertOplog = historyIter.next(opCtx);
+
+ ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
+ ASSERT_TRUE(firstInsertOplog.getStatementId());
+ ASSERT_EQ(30, *firstInsertOplog.getStatementId());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index e79656ba94b..829d862b370 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -76,7 +76,6 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
- invariant(!_lastFetchedNewWriteOplog.isEmpty());
return _lastFetchedNewWriteOplog;
}
}
@@ -101,11 +100,13 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
return false;
}
- _lastFetchedOplog = nextOplog.toBSON().getOwned();
-
+ auto nextOplogBSON = nextOplog.toBSON().getOwned();
auto doc = fetchPrePostImageOplog(opCtx, nextOplog);
if (!doc.isEmpty()) {
- _lastFetchedOplogBuffer.push_back(doc);
+ _lastFetchedOplogBuffer.push_back(nextOplogBSON);
+ _lastFetchedOplog = doc;
+ } else {
+ _lastFetchedOplog = nextOplogBSON;
}
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 42a3e71e372..5e44531d31c 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -68,6 +68,7 @@ public:
/**
* Returns the oplog document that was last fetched by the fetchNextOplog call.
+ * Returns an empty object if there are no oplog to fetch.
*/
BSONObj getLastFetchedOplog();
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index ad4942292ea..ff43cce5d0b 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()};
+ auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()};
for (auto oplogDoc : expectedSequece) {
ASSERT_TRUE(migrationSource.hasMoreOplog());