summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp19
-rw-r--r--src/mongo/db/s/collection_sharding_state.h12
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp10
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h15
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp90
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h19
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp52
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp16
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp23
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp120
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.h13
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp79
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp9
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h1
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp2
16 files changed, 403 insertions, 80 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 9e6477c1d4b..575c442ac74 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -267,7 +267,9 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co
return _metadataManager->getNextOrphanRange(from);
}
-void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) {
+void CollectionShardingState::onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -291,14 +293,16 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj&
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc);
+ _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs);
}
}
void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -315,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc);
+ _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs);
}
}
@@ -324,7 +328,10 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState
_sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)};
}
-void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) {
+void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
+ const DeleteState& deleteState,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -365,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteSt
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
- _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey);
+ _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs);
}
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 5aa44336f70..6f9722dceb9 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -49,6 +49,7 @@ struct ChunkVersion;
class CollectionMetadata;
class MigrationSourceManager;
class OperationContext;
+class Timestamp;
/**
* Contains all sharding-related runtime state for a given collection. One such object is assigned
@@ -223,12 +224,17 @@ public:
*
* The global exclusive lock is expected to be held by the caller of any of these functions.
*/
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc);
+ void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs);
void onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc);
- void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState);
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs);
+ void onDeleteOp(OperationContext* opCtx,
+ const DeleteState& deleteState,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs);
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
private:
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 844727e72c4..9ac9f4a9702 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
@@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
{
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
}
@@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
@@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument
ShardIdentityType shardIdentity;
shardIdentity.setShardName("a");
- ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()),
+ ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}),
AssertionException);
}
@@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot
NamespaceString::kServerConfigurationNamespace);
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), BSON("_id" << 1));
+ collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {});
ASSERT_EQ(0, getInitCallCount());
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 750bcdcda80..9d9a1407f1d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -36,6 +36,7 @@ namespace mongo {
class BSONObj;
class OperationContext;
class Status;
+class Timestamp;
/**
* This class is responsible for producing chunk documents to be moved from donor to a recipient
@@ -118,7 +119,9 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0;
+ virtual void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) = 0;
/**
* Notifies this cloner that an update happened to the collection, which it owns. It is up to
@@ -127,7 +130,10 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0;
+ virtual void onUpdateOp(OperationContext* opCtx,
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) = 0;
/**
* Notifies this cloner that a delede happened to the collection, which it owns. It is up to the
@@ -136,7 +142,10 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0;
+ virtual void onDeleteOp(OperationContext* opCtx,
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) = 0;
protected:
MigrationChunkClonerSource();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 70a76e5e473..5d0db5dea0b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -135,8 +135,14 @@ public:
*/
LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
const BSONObj& idObj,
- const char op)
- : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {}
+ const char op,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs)
+ : _cloner(cloner),
+ _idObj(idObj.getOwned()),
+ _op(op),
+ _oplogTs(oplogTs),
+ _prePostImageTs(prePostImageTs) {}
void commit() override {
switch (_op) {
@@ -156,6 +162,14 @@ public:
default:
MONGO_UNREACHABLE;
}
+
+ if (!_prePostImageTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs);
+ }
+
+ if (!_oplogTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs);
+ }
}
void rollback() override {}
@@ -164,6 +178,8 @@ private:
MigrationChunkClonerSourceLegacy* const _cloner;
const BSONObj _idObj;
const char _op;
+ const Timestamp _oplogTs;
+ const Timestamp _prePostImageTs;
};
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
@@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)) {}
+ _recipientHost(std::move(recipientHost)),
+ _sessionCatalogSource(_args.getNss()) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
return storeCurrentLocsStatus;
}
+ // Prime up the session migration source if there are oplog entries to migrate.
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
@@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(opCtx);
+
+ if (_sessionCatalogSource.hasMoreOplog()) {
+ return {ErrorCodes::SessionTransferIncomplete,
+ "destination shard finished committing but there are still some session "
+ "metadata that needs to be transferred"};
+ }
return Status::OK();
}
@@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj&
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc) {
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
@@ -358,11 +385,19 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {}));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {}));
+ }
}
void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
@@ -376,11 +411,19 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {}));
+ }
}
void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId) {
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
@@ -390,7 +433,13 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {}));
+ }
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -679,4 +728,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
arr.done();
}
+void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
+ BSONArrayBuilder* arrBuilder) {
+ while (_sessionCatalogSource.hasMoreOplog()) {
+ auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog();
+
+ if (oplogDoc.isEmpty()) {
+ // Last fetched turned out empty, try to see if there are more
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ continue;
+ }
+
+ // Use the builder size instead of accumulating the document sizes directly so that we
+ // take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) {
+ break;
+ }
+
+ arrBuilder->append(oplogDoc);
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index c072718475a..72d299b58e4 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -37,6 +37,7 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/memory.h"
@@ -72,11 +73,19 @@ public:
bool isDocumentInMigratingChunk(const BSONObj& doc) override;
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override;
+ void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) override;
- void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override;
+ void onUpdateOp(OperationContext* opCtx,
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) override;
- void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override;
+ void onDeleteOp(OperationContext* opCtx,
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) override;
// Legacy cloner specific functionality
@@ -121,6 +130,8 @@ public:
*/
Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder);
+ void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder);
+
private:
friend class DeleteNotificationStage;
friend class LogOpForShardingHandler;
@@ -183,6 +194,8 @@ private:
// during the cloning stage
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec;
+ SessionCatalogMigrationSource _sessionCatalogSource;
+
// Protects the entries below
stdx::mutex _mutex;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 9e3e83774ee..d42c6bdaa99 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -216,5 +216,57 @@ public:
} transferModsCommand;
+/**
+ * Command for extracting the oplog entries that needs to be migrated for the given migration
+ * session id.
+ * Note: this command is not stateless. Calling this command has a side-effect of gradually
+ * depleting the buffer that contains the oplog entries to be transfered.
+ */
+class MigrateSessionCommand : public BasicCommand {
+public:
+ MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {}
+
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+
+ virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+
+ virtual bool slaveOk() const {
+ return false;
+ }
+
+ virtual bool adminOnly() const {
+ return true;
+ }
+
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ BSONArrayBuilder arrBuilder;
+
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder);
+
+ result.appendArray("oplog", arrBuilder.arr());
+ return true;
+ }
+
+} migrateSessionCommand;
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 193c237db5c..a15f6487a33 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
WriteUnitOfWork wuow(operationContext());
- cloner.onInsertOp(operationContext(), createCollectionDocument(90));
- cloner.onInsertOp(operationContext(), createCollectionDocument(150));
- cloner.onInsertOp(operationContext(), createCollectionDocument(151));
- cloner.onInsertOp(operationContext(), createCollectionDocument(210));
-
- cloner.onDeleteOp(operationContext(), createCollectionDocument(80));
- cloner.onDeleteOp(operationContext(), createCollectionDocument(199));
- cloner.onDeleteOp(operationContext(), createCollectionDocument(220));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(90), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(150), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(151), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(210), {});
+
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {});
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {});
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {});
wuow.commit();
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index ca2c736a6b1..86688ba08ca 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -57,6 +57,8 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/concurrency/notification.h"
@@ -231,6 +233,8 @@ void MigrationDestinationManager::setStateFail(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
+
+ _sessionMigration->forceFail(msg);
}
void MigrationDestinationManager::setStateFailWarn(std::string msg) {
@@ -240,6 +244,8 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
+
+ _sessionMigration->forceFail(msg);
}
bool MigrationDestinationManager::isActive() const {
@@ -334,6 +340,9 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_migrateThreadHandle.join();
}
+ _sessionMigration =
+ stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId);
+
_migrateThreadHandle =
stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
_migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
@@ -397,6 +406,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
<< _sessionId->toString()};
}
+ _sessionMigration->finish();
_state = COMMIT_START;
auto const deadline = Date_t::now() + Seconds(30);
@@ -411,6 +421,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
if (_state != DONE) {
return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"};
}
+
return Status::OK();
}
@@ -672,6 +683,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 3. Initial bulk clone
setState(CLONE);
+ _sessionMigration->start(opCtx->getServiceContext());
+
const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
@@ -829,13 +842,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
break;
}
- sleepsecs(1);
+ opCtx->sleepFor(Seconds(1));
}
if (t.minutes() >= 600) {
setStateFail("Cannot go to critical section because secondaries cannot keep up");
return;
}
+
+ _sessionMigration->waitUntilReadyToCommit(opCtx);
}
{
@@ -895,6 +910,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5);
}
+ _sessionMigration->join();
+ if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
+ setStateFail(_sessionMigration->getErrMsg());
+ return;
+ }
+
setState(DONE);
timing.done(6);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 610f70196c1..49c6f1e5089 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -39,6 +39,7 @@
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_destination.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
@@ -211,6 +212,8 @@ private:
State _state{READY};
std::string _errmsg;
+
+ std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration;
};
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index c9344031555..b1b373f17c5 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -201,7 +201,6 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
*/
ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const BSONObj& oplogBSON,
- // const Timestamp& prePostImageTs,
const ProcessOplogResult& lastResult) {
ProcessOplogResult result;
auto oplogEntry = parseOplog(oplogBSON);
@@ -240,50 +239,62 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
result.sessionId = sessionInfo.getSessionId().value();
result.txnNum = sessionInfo.getTxnNumber().value();
+ const auto stmtId = *oplogEntry.getStatementId();
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
+ if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
+ return lastResult;
+ }
+
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum);
- writeConflictRetry(opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- sessionInfo,
- *oplogEntry.getStatementId(),
- oplogLink);
-
- auto oplogTs = result.oplogTime.getTimestamp();
- uassert(40633,
- str::stream()
- << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogTs.isNull());
-
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(
- opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs);
- }
-
- wunit.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and trigger the
+ // invariant that disallows unlocking global lock while inside a WUOW.
+ // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush
+ // lock will be lock/unlocked correctly. Take the transaction table db lock to
+ // ensure the same lock ordering with normal replicated updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+
+ auto oplogTs = result.oplogTime.getTimestamp();
+ uassert(40633,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogTs.isNull());
+
+ // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because
+ // the next oplog will contain the real operation.
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs);
+ }
+
+ wunit.commit();
+ });
return result;
}
@@ -307,6 +318,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == State::NotStarted);
_state = State::Migrating;
+ _isStateChanged.notify_all();
}
_thread = stdx::thread(stdx::bind(
@@ -316,6 +328,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
void SessionCatalogMigrationDestination::finish() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Committing;
+ _isStateChanged.notify_all();
}
void SessionCatalogMigrationDestination::join() {
@@ -343,8 +356,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
auto uniqueCtx = cc().makeOperationContext();
auto opCtx = uniqueCtx.get();
- // Timestamp prePostImageTs;
+ bool oplogDrainedAfterCommiting = false;
ProcessOplogResult lastResult;
+ repl::OpTime lastOpTimeWaited;
while (true) {
{
@@ -363,7 +377,16 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_state == State::Committing) {
- break;
+ // The migration is considered done only when it gets an empty result from
+ // the source shard while this is in state committing. This is to make sure
+ // that it doesn't miss any new oplog created between the time window where
+ // this depleted the buffer from the source shard and receiving the commit
+ // command.
+ if (oplogDrainedAfterCommiting) {
+ break;
+ }
+
+ oplogDrainedAfterCommiting = true;
}
}
@@ -381,8 +404,17 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
// Note: only transition to "ready to commit" if state is not error/force stop.
if (_state == State::Migrating) {
_state = State::ReadyToCommit;
+ _isStateChanged.notify_all();
}
}
+
+ if (lastOpTimeWaited == lastResult.oplogTime) {
+ // We got an empty result at least twice in a row from the source shard so
+ // space it out a little bit so we don't hammer the shard.
+ opCtx->sleepFor(Milliseconds(200));
+ }
+
+ lastOpTimeWaited = lastResult.oplogTime;
}
while (oplogIter.more()) {
@@ -411,6 +443,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Done;
+ _isStateChanged.notify_all();
}
}
@@ -423,6 +456,8 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::ErrorOccurred;
_errMsg = errMsg.toString();
+
+ _isStateChanged.notify_all();
}
SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() {
@@ -430,4 +465,15 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge
return _state;
}
+void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) {
+ _errorOccurred(errMsg);
+}
+
+void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_state == State::Migrating) {
+ opCtx->waitForConditionOrInterrupt(_isStateChanged, lk);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h
index 2e740f2f6bb..a67bd8012f6 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.h
+++ b/src/mongo/db/s/session_catalog_migration_destination.h
@@ -37,6 +37,7 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/shard_id.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -88,6 +89,17 @@ public:
void join();
/**
+ * Forces this into an error state which will also stop session transfer thread.
+ */
+ void forceFail(std::string& errMsg);
+
+ /**
+ * Blocks until state changes is not Migrating. In other words, can return when state
+ * becomes ReadyToCommit/Done/ErrorOccurred, etc.
+ */
+ void waitUntilReadyToCommit(OperationContext* opCtx);
+
+ /**
* Returns the current state.
*/
State getState();
@@ -109,6 +121,7 @@ private:
// Protects _state and _errMsg.
stdx::mutex _mutex;
+ stdx::condition_variable _isStateChanged;
State _state = State::NotStarted;
std::string _errMsg; // valid only if _state == ErrorOccurred.
};
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index a1a76dfbd1d..71c63721c6f 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -253,6 +253,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTr
sessionMigration.start(getServiceContext());
sessionMigration.finish();
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -285,6 +287,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -336,6 +340,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -380,6 +386,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
returnOplog({oplog1, oplog2});
returnOplog({oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -433,6 +441,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -506,6 +516,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
oplog2.setStatementId(45);
returnOplog({oplog1, oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -556,6 +568,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
updateOplog.setPreImageTs(Timestamp(100, 2));
returnOplog({preImageOplog, updateOplog});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -645,6 +659,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
updateOplog.setPostImageTs(Timestamp(100, 2));
returnOplog({postImageOplog, updateOplog});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -735,6 +751,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
returnOplog({preImageOplog});
returnOplog({updateOplog});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -829,6 +847,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
oplog2.setStatementId(45);
returnOplog({oplog1, oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -883,6 +903,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
oplog2.setStatementId(45);
returnOplog({oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -1048,6 +1070,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
oplog2.setStatementId(45);
returnOplog({oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -1303,6 +1327,61 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ auto opCtx = operationContext();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(19);
+
+ insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30);
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(30);
+
+ OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(45);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto session = getSessionWithTxn(opCtx, sessionId, 19);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ auto firstInsertOplog = historyIter.next(opCtx);
+
+ ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
+ ASSERT_TRUE(firstInsertOplog.getStatementId());
+ ASSERT_EQ(30, *firstInsertOplog.getStatementId());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index e79656ba94b..829d862b370 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -76,7 +76,6 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
- invariant(!_lastFetchedNewWriteOplog.isEmpty());
return _lastFetchedNewWriteOplog;
}
}
@@ -101,11 +100,13 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
return false;
}
- _lastFetchedOplog = nextOplog.toBSON().getOwned();
-
+ auto nextOplogBSON = nextOplog.toBSON().getOwned();
auto doc = fetchPrePostImageOplog(opCtx, nextOplog);
if (!doc.isEmpty()) {
- _lastFetchedOplogBuffer.push_back(doc);
+ _lastFetchedOplogBuffer.push_back(nextOplogBSON);
+ _lastFetchedOplog = doc;
+ } else {
+ _lastFetchedOplog = nextOplogBSON;
}
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 42a3e71e372..5e44531d31c 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -68,6 +68,7 @@ public:
/**
* Returns the oplog document that was last fetched by the fetchNextOplog call.
+ * Returns an empty object if there are no oplog to fetch.
*/
BSONObj getLastFetchedOplog();
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index ad4942292ea..ff43cce5d0b 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()};
+ auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()};
for (auto oplogDoc : expectedSequece) {
ASSERT_TRUE(migrationSource.hasMoreOplog());