summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-09-28 10:45:49 -0400
committerRandolph Tan <randolph@10gen.com>2017-09-28 10:45:49 -0400
commitf24fbb0011c6ded9101f08574e7cd07e63690a9b (patch)
treee1452828e142748f1f03be61a00c32dbb3ed6bc1 /src
parentd293f6857bcb36b26ca8fa03d90299714fe060de (diff)
downloadmongo-f24fbb0011c6ded9101f08574e7cd07e63690a9b.tar.gz
Revert "Revert "Revert "SERVER-30894 Implement command for transferring session information during migration"""
This reverts commit aaa0c96532ba6a8ea9146e4298c6bf1cc6b27f9e.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/op_observer_impl.cpp95
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp10
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp17
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/repl/oplog.h3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp19
-rw-r--r--src/mongo/db/s/collection_sharding_state.h12
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp10
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h15
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp75
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h19
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp52
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp16
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp23
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp107
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.h13
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp53
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp9
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h1
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp2
22 files changed, 139 insertions, 420 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 24825fb09ba..80a17b9ccf1 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -225,7 +225,6 @@ error_code("JSONSchemaNotAllowed", 224)
error_code("TransactionTooOld", 225)
error_code("AtomicityFailure", 226)
error_code("CannotImplicitlyCreateCollection", 227);
-error_code("SessionTransferIncomplete", 228)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index b466b70c3bb..9a6bd8e5be7 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -115,15 +115,10 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd,
return cmdObjBuilder.obj();
}
-struct OpTimeBundle {
- repl::OpTime writeOpTime;
- repl::OpTime prePostImageOpTime;
-};
-
/**
* Write oplog entry(ies) for the update operation.
*/
-OpTimeBundle replLogUpdate(OperationContext* opCtx,
+repl::OpTime replLogUpdate(OperationContext* opCtx,
Session* session,
const OplogUpdateEntryArgs& args) {
BSONObj storeObj;
@@ -143,8 +138,6 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
- OpTimeBundle opTimes;
-
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
auto noteUpdateOpTime = repl::logOp(opCtx,
"n",
@@ -157,8 +150,6 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
args.stmtId,
{});
- opTimes.prePostImageOpTime = noteUpdateOpTime;
-
if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
oplogLink.preImageTs = noteUpdateOpTime.getTimestamp();
} else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
@@ -166,24 +157,22 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
}
}
- opTimes.writeOpTime = repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- sessionInfo,
- args.stmtId,
- oplogLink);
-
- return opTimes;
+ return repl::logOp(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
}
/**
* Write oplog entry(ies) for the delete operation.
*/
-OpTimeBundle replLogDelete(OperationContext* opCtx,
+repl::OpTime replLogDelete(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
Session* session,
@@ -200,26 +189,22 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
- OpTimeBundle opTimes;
-
if (deletedDoc && opCtx->getTxnNumber()) {
auto noteOplog = repl::logOp(
opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {});
- opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageTs = noteOplog.getTimestamp();
}
- opTimes.writeOpTime = repl::logOp(opCtx,
- "d",
- nss,
- uuid,
- deleteState.documentKey,
- nullptr,
- fromMigrate,
- sessionInfo,
- stmtId,
- oplogLink);
- return opTimes;
+ return repl::logOp(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ sessionInfo,
+ stmtId,
+ oplogLink);
}
} // namespace
@@ -268,7 +253,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, systemIndexes);
if (!fromMigrate) {
- css->onInsertOp(opCtx, indexDoc, {});
+ css->onInsertOp(opCtx, indexDoc);
}
}
@@ -279,20 +264,15 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
-
- const size_t count = end - begin;
- auto timestamps = stdx::make_unique<Timestamp[]>(count);
- const auto lastOpTime =
- repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate);
+ const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
- size_t index = 0;
- for (auto it = begin; it != end; it++, index++) {
+ for (auto it = begin; it != end; it++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (!fromMigrate) {
- css->onInsertOp(opCtx, it->doc, timestamps[index]);
+ css->onInsertOp(opCtx, it->doc);
}
}
@@ -332,12 +312,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
auto css = CollectionShardingState::get(opCtx, args.nss);
if (!args.fromMigrate) {
- css->onUpdateOp(opCtx,
- args.criteria,
- args.update,
- args.updatedDoc,
- opTime.writeOpTime.getTimestamp(),
- opTime.prePostImageOpTime.getTimestamp());
+ css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc);
}
if (args.nss.coll() == "system.js") {
@@ -347,13 +322,11 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
} else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc);
} else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
- !opTime.writeOpTime.isNull()) {
+ !opTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
-
- onWriteOpCompleted(
- opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
+ onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime);
}
auto OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -383,10 +356,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, nss.ns());
if (!fromMigrate) {
- css->onDeleteOp(opCtx,
- deleteState,
- opTime.writeOpTime.getTimestamp(),
- opTime.prePostImageOpTime.getTimestamp());
+ css->onDeleteOp(opCtx, deleteState);
}
if (nss.coll() == "system.js") {
@@ -395,12 +365,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey);
- } else if (nss == NamespaceString::kSessionTransactionsTableNamespace &&
- !opTime.writeOpTime.isNull()) {
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
- onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime);
+ onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 3033daa68f9..13d3a1cae56 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -90,6 +90,16 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
opType == repl::OpTypeEnum::kUpdate);
+ uassert(
+ 40610,
+ str::stream() << "findAndModify retry request: " << redact(request.toBSON())
+ << " is not compatible with previous write in the transaction of type: "
+ << OpType_serializer(oplogEntry.getOpType())
+ << ", oplogTs: "
+ << ts.toString()
+ << ", oplog: "
+ << redact(oplogEntry.toBSON()),
+ !request.isUpsert());
if (request.shouldReturnNew()) {
uassert(40611,
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index a50c51eba58..f3415ae2427 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -273,6 +273,23 @@ TEST_F(FindAndModifyRetryability, NestedUpsert) {
ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue());
}
+TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) {
+ auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
+ request.setUpsert(true);
+
+ Timestamp imageTs(120, 3);
+ repl::OplogEntry noteOplog(
+ repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
+
+ insertOplogEntry(noteOplog);
+
+ repl::OplogEntry oplog(
+ repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1));
+ oplog.setPreImageTs(imageTs);
+
+ ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException);
+}
+
TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setUpsert(false);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index f91ae11097b..6fa9c678fbd 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -452,7 +452,6 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
Session* session,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
- Timestamp timestamps[],
bool fromMigrate) {
invariant(begin != end);
@@ -481,6 +480,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ auto timestamps = stdx::make_unique<Timestamp[]>(count);
OpTime lastOpTime;
for (size_t i = 0; i < count; i++) {
// Make a mutable copy.
@@ -512,7 +512,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
basePtrs[i] = &writers[i];
}
invariant(!lastOpTime.isNull());
- _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime);
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
wuow.commit();
return lastOpTime;
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 34a96b23b05..a5eff8ac1aa 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -103,8 +103,6 @@ extern int OPLOG_VERSION;
/**
* Log insert(s) to the local oplog.
* Returns the OpTime of the last insert.
- * The timestamps parameter can also be modified and contain the individual timestamps for each
- * insert after the oplog entries were created.
*/
OpTime logInsertOps(OperationContext* opCtx,
const NamespaceString& nss,
@@ -112,7 +110,6 @@ OpTime logInsertOps(OperationContext* opCtx,
Session* session,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
- Timestamp timestamps[],
bool fromMigrate);
/**
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 575c442ac74..9e6477c1d4b 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -267,9 +267,7 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co
return _metadataManager->getNextOrphanRange(from);
}
-void CollectionShardingState::onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc,
- const Timestamp& oplogTs) {
+void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -293,16 +291,14 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs);
+ _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc);
}
}
void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) {
+ const BSONObj& updatedDoc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -319,7 +315,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs);
+ _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc);
}
}
@@ -328,10 +324,7 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState
_sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)};
}
-void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
- const DeleteState& deleteState,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) {
+void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -372,7 +365,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
- _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs);
+ _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey);
}
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 6f9722dceb9..5aa44336f70 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -49,7 +49,6 @@ struct ChunkVersion;
class CollectionMetadata;
class MigrationSourceManager;
class OperationContext;
-class Timestamp;
/**
* Contains all sharding-related runtime state for a given collection. One such object is assigned
@@ -224,17 +223,12 @@ public:
*
* The global exclusive lock is expected to be held by the caller of any of these functions.
*/
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs);
+ void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc);
void onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs);
- void onDeleteOp(OperationContext* opCtx,
- const DeleteState& deleteState,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs);
+ const BSONObj& updatedDoc);
+ void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState);
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
private:
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 9ac9f4a9702..844727e72c4 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
ASSERT_EQ(0, getInitCallCount());
@@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
{
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
ASSERT_EQ(0, getInitCallCount());
}
@@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
ASSERT_EQ(0, getInitCallCount());
@@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument
ShardIdentityType shardIdentity;
shardIdentity.setShardName("a");
- ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}),
+ ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()),
AssertionException);
}
@@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot
NamespaceString::kServerConfigurationNamespace);
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {});
+ collShardingState.onInsertOp(operationContext(), BSON("_id" << 1));
ASSERT_EQ(0, getInitCallCount());
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 9d9a1407f1d..750bcdcda80 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -36,7 +36,6 @@ namespace mongo {
class BSONObj;
class OperationContext;
class Status;
-class Timestamp;
/**
* This class is responsible for producing chunk documents to be moved from donor to a recipient
@@ -119,9 +118,7 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc,
- const Timestamp& oplogTs) = 0;
+ virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0;
/**
* Notifies this cloner that an update happened to the collection, which it owns. It is up to
@@ -130,10 +127,7 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) = 0;
+ virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0;
/**
* Notifies this cloner that a delede happened to the collection, which it owns. It is up to the
@@ -142,10 +136,7 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) = 0;
+ virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0;
protected:
MigrationChunkClonerSource();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 65a75e4c9e0..70a76e5e473 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -135,14 +135,8 @@ public:
*/
LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
const BSONObj& idObj,
- const char op,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs)
- : _cloner(cloner),
- _idObj(idObj.getOwned()),
- _op(op),
- _oplogTs(oplogTs),
- _prePostImageTs(prePostImageTs) {}
+ const char op)
+ : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {}
void commit() override {
switch (_op) {
@@ -162,14 +156,6 @@ public:
default:
MONGO_UNREACHABLE;
}
-
- if (!_prePostImageTs.isNull()) {
- _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs);
- }
-
- if (!_oplogTs.isNull()) {
- _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs);
- }
}
void rollback() override {}
@@ -178,8 +164,6 @@ private:
MigrationChunkClonerSourceLegacy* const _cloner;
const BSONObj _idObj;
const char _op;
- const Timestamp _oplogTs;
- const Timestamp _prePostImageTs;
};
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
@@ -191,8 +175,7 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)),
- _sessionCatalogSource(_args.getNss()) {}
+ _recipientHost(std::move(recipientHost)) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -209,9 +192,6 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
return storeCurrentLocsStatus;
}
- // Prime up the session migration source if there are oplog entries to migrate.
- _sessionCatalogSource.fetchNextOplog(opCtx);
-
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
@@ -334,12 +314,6 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(opCtx);
-
- if (_sessionCatalogSource.hasMoreOplog()) {
- return {ErrorCodes::SessionTransferIncomplete,
- "destination shard finished committing but there are still some session "
- "metadata that needs to be transferred"};
- }
return Status::OK();
}
@@ -370,8 +344,7 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj&
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc,
- const Timestamp& oplogTs) {
+ const BSONObj& insertedDoc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
@@ -385,14 +358,11 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(
- new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {}));
+ opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
}
void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) {
+ const BSONObj& updatedDoc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
@@ -406,14 +376,11 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(
- new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs));
+ opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
}
void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) {
+ const BSONObj& deletedDocId) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
@@ -423,8 +390,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(
- new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs));
+ opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -713,27 +679,4 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
arr.done();
}
-void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
- BSONArrayBuilder* arrBuilder) {
- while (_sessionCatalogSource.hasMoreOplog()) {
- auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog();
-
- if (oplogDoc.isEmpty()) {
- // Last fetched turned out empty, try to see if there are more
- _sessionCatalogSource.fetchNextOplog(opCtx);
- continue;
- }
-
- // Use the builder size instead of accumulating the document sizes directly so that we
- // take into consideration the overhead of BSONArray indices.
- if (arrBuilder->arrSize() &&
- (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) {
- break;
- }
-
- arrBuilder->append(oplogDoc);
- _sessionCatalogSource.fetchNextOplog(opCtx);
- }
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 72d299b58e4..c072718475a 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -37,7 +37,6 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
-#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/memory.h"
@@ -73,19 +72,11 @@ public:
bool isDocumentInMigratingChunk(const BSONObj& doc) override;
- void onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc,
- const Timestamp& oplogTs) override;
+ void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override;
- void onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc,
- const Timestamp& oplogTs,
- const Timestamp& prePostImageTs) override;
+ void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override;
- void onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId,
- const Timestamp& oplogTs,
- const Timestamp& preImageTs) override;
+ void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override;
// Legacy cloner specific functionality
@@ -130,8 +121,6 @@ public:
*/
Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder);
- void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder);
-
private:
friend class DeleteNotificationStage;
friend class LogOpForShardingHandler;
@@ -194,8 +183,6 @@ private:
// during the cloning stage
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec;
- SessionCatalogMigrationSource _sessionCatalogSource;
-
// Protects the entries below
stdx::mutex _mutex;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index d42c6bdaa99..9e3e83774ee 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -216,57 +216,5 @@ public:
} transferModsCommand;
-/**
- * Command for extracting the oplog entries that needs to be migrated for the given migration
- * session id.
- * Note: this command is not stateless. Calling this command has a side-effect of gradually
- * depleting the buffer that contains the oplog entries to be transfered.
- */
-class MigrateSessionCommand : public BasicCommand {
-public:
- MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {}
-
- void help(std::stringstream& h) const {
- h << "internal";
- }
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
-
- virtual bool slaveOk() const {
- return false;
- }
-
- virtual bool adminOnly() const {
- return true;
- }
-
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
-
- bool run(OperationContext* opCtx,
- const std::string&,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- const MigrationSessionId migrationSessionId(
- uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
-
- BSONArrayBuilder arrBuilder;
-
- AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
- autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder);
-
- result.appendArray("oplog", arrBuilder.arr());
- return true;
- }
-
-} migrateSessionCommand;
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index a15f6487a33..193c237db5c 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
WriteUnitOfWork wuow(operationContext());
- cloner.onInsertOp(operationContext(), createCollectionDocument(90), {});
- cloner.onInsertOp(operationContext(), createCollectionDocument(150), {});
- cloner.onInsertOp(operationContext(), createCollectionDocument(151), {});
- cloner.onInsertOp(operationContext(), createCollectionDocument(210), {});
-
- cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {});
- cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {});
- cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(90));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(150));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(151));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(210));
+
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(80));
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(199));
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(220));
wuow.commit();
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 86688ba08ca..ca2c736a6b1 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -57,8 +57,6 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/concurrency/notification.h"
@@ -233,8 +231,6 @@ void MigrationDestinationManager::setStateFail(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
-
- _sessionMigration->forceFail(msg);
}
void MigrationDestinationManager::setStateFailWarn(std::string msg) {
@@ -244,8 +240,6 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
-
- _sessionMigration->forceFail(msg);
}
bool MigrationDestinationManager::isActive() const {
@@ -340,9 +334,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_migrateThreadHandle.join();
}
- _sessionMigration =
- stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId);
-
_migrateThreadHandle =
stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
_migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
@@ -406,7 +397,6 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
<< _sessionId->toString()};
}
- _sessionMigration->finish();
_state = COMMIT_START;
auto const deadline = Date_t::now() + Seconds(30);
@@ -421,7 +411,6 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
if (_state != DONE) {
return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"};
}
-
return Status::OK();
}
@@ -683,8 +672,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 3. Initial bulk clone
setState(CLONE);
- _sessionMigration->start(opCtx->getServiceContext());
-
const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
@@ -842,15 +829,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
break;
}
- opCtx->sleepFor(Seconds(1));
+ sleepsecs(1);
}
if (t.minutes() >= 600) {
setStateFail("Cannot go to critical section because secondaries cannot keep up");
return;
}
-
- _sessionMigration->waitUntilReadyToCommit(opCtx);
}
{
@@ -910,12 +895,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5);
}
- _sessionMigration->join();
- if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
- setStateFail(_sessionMigration->getErrMsg());
- return;
- }
-
setState(DONE);
timing.done(6);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 49c6f1e5089..610f70196c1 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -39,7 +39,6 @@
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_session_id.h"
-#include "mongo/db/s/session_catalog_migration_destination.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
@@ -212,8 +211,6 @@ private:
State _state{READY};
std::string _errmsg;
-
- std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration;
};
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 1c4c250dac1..c9344031555 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -201,6 +201,7 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
*/
ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const BSONObj& oplogBSON,
+ // const Timestamp& prePostImageTs,
const ProcessOplogResult& lastResult) {
ProcessOplogResult result;
auto oplogEntry = parseOplog(oplogBSON);
@@ -239,62 +240,50 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
result.sessionId = sessionInfo.getSessionId().value();
result.txnNum = sessionInfo.getTxnNumber().value();
- const auto stmtId = *oplogEntry.getStatementId();
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
- if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
- return lastResult;
- }
-
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum);
- writeConflictRetry(
- opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- // Need to take global lock here so repl::logOp will not unlock it and trigger the
- // invariant that disallows unlocking global lock while inside a WUOW.
- // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush
- // lock will be lock/unlocked correctly. Take the transaction table db lock to
- // ensure the same lock ordering with normal replicated updates to the table.
- Lock::DBLock lk(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- sessionInfo,
- stmtId,
- oplogLink);
-
- auto oplogTs = result.oplogTime.getTimestamp();
- uassert(40633,
- str::stream() << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogTs.isNull());
-
- // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because
- // the next oplog will contain the real operation.
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs);
- }
-
- wunit.commit();
- });
+ writeConflictRetry(opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ sessionInfo,
+ *oplogEntry.getStatementId(),
+ oplogLink);
+
+ auto oplogTs = result.oplogTime.getTimestamp();
+ uassert(40633,
+ str::stream()
+ << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogTs.isNull());
+
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(
+ opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs);
+ }
+
+ wunit.commit();
+ });
return result;
}
@@ -318,7 +307,6 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == State::NotStarted);
_state = State::Migrating;
- _isStateChanged.notify_all();
}
_thread = stdx::thread(stdx::bind(
@@ -328,7 +316,6 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
void SessionCatalogMigrationDestination::finish() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Committing;
- _isStateChanged.notify_all();
}
void SessionCatalogMigrationDestination::join() {
@@ -358,7 +345,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
// Timestamp prePostImageTs;
ProcessOplogResult lastResult;
- repl::OpTime lastOpTimeWaited;
while (true) {
{
@@ -395,17 +381,8 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
// Note: only transition to "ready to commit" if state is not error/force stop.
if (_state == State::Migrating) {
_state = State::ReadyToCommit;
- _isStateChanged.notify_all();
}
}
-
- if (lastOpTimeWaited == lastResult.oplogTime) {
- // We got an empty result at least twice in a row from the source shard so
- // space it out a little bit so we don't hammer the shard.
- opCtx->sleepFor(Milliseconds(200));
- }
-
- lastOpTimeWaited = lastResult.oplogTime;
}
while (oplogIter.more()) {
@@ -434,7 +411,6 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Done;
- _isStateChanged.notify_all();
}
}
@@ -447,8 +423,6 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::ErrorOccurred;
_errMsg = errMsg.toString();
-
- _isStateChanged.notify_all();
}
SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() {
@@ -456,15 +430,4 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge
return _state;
}
-void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) {
- _errorOccurred(errMsg);
-}
-
-void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (_state == State::Migrating) {
- opCtx->waitForConditionOrInterrupt(_isStateChanged, lk);
- }
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h
index a67bd8012f6..2e740f2f6bb 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.h
+++ b/src/mongo/db/s/session_catalog_migration_destination.h
@@ -37,7 +37,6 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/shard_id.h"
-#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -89,17 +88,6 @@ public:
void join();
/**
- * Forces this into an error state which will also stop session transfer thread.
- */
- void forceFail(std::string& errMsg);
-
- /**
- * Blocks until state changes is not Migrating. In other words, can return when state
- * becomes ReadyToCommit/Done/ErrorOccurred, etc.
- */
- void waitUntilReadyToCommit(OperationContext* opCtx);
-
- /**
* Returns the current state.
*/
State getState();
@@ -121,7 +109,6 @@ private:
// Protects _state and _errMsg.
stdx::mutex _mutex;
- stdx::condition_variable _isStateChanged;
State _state = State::NotStarted;
std::string _errMsg; // valid only if _state == ErrorOccurred.
};
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 1ec90ca88b1..a1a76dfbd1d 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -1303,59 +1303,6 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
-TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) {
- const NamespaceString kNs("a.b");
- const auto sessionId = makeLogicalSessionIdForTest();
-
- auto opCtx = operationContext();
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(sessionId);
- sessionInfo.setTxnNumber(19);
-
- insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30);
-
- SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
- sessionMigration.start(getServiceContext());
- sessionMigration.finish();
-
- OplogEntry oplog1(
- OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
- oplog1.setOperationSessionInfo(sessionInfo);
- oplog1.setStatementId(23);
-
- OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
- oplog2.setOperationSessionInfo(sessionInfo);
- oplog2.setStatementId(30);
-
- OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
- oplog3.setOperationSessionInfo(sessionInfo);
- oplog3.setStatementId(45);
-
- returnOplog({oplog1, oplog2, oplog3});
- returnOplog({});
-
- sessionMigration.join();
-
- ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
-
- auto session = getSessionWithTxn(opCtx, sessionId, 19);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19));
-
- ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
-
- ASSERT_TRUE(historyIter.hasNext());
- checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
-
- ASSERT_TRUE(historyIter.hasNext());
- auto firstInsertOplog = historyIter.next(opCtx);
-
- ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert);
- ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
- ASSERT_TRUE(firstInsertOplog.getStatementId());
- ASSERT_EQ(30, *firstInsertOplog.getStatementId());
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 829d862b370..e79656ba94b 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -76,6 +76,7 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+ invariant(!_lastFetchedNewWriteOplog.isEmpty());
return _lastFetchedNewWriteOplog;
}
}
@@ -100,13 +101,11 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
return false;
}
- auto nextOplogBSON = nextOplog.toBSON().getOwned();
+ _lastFetchedOplog = nextOplog.toBSON().getOwned();
+
auto doc = fetchPrePostImageOplog(opCtx, nextOplog);
if (!doc.isEmpty()) {
- _lastFetchedOplogBuffer.push_back(nextOplogBSON);
- _lastFetchedOplog = doc;
- } else {
- _lastFetchedOplog = nextOplogBSON;
+ _lastFetchedOplogBuffer.push_back(doc);
}
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 5e44531d31c..42a3e71e372 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -68,7 +68,6 @@ public:
/**
* Returns the oplog document that was last fetched by the fetchNextOplog call.
- * Returns an empty object if there are no oplog to fetch.
*/
BSONObj getLastFetchedOplog();
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index ff43cce5d0b..ad4942292ea 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()};
+ auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()};
for (auto oplogDoc : expectedSequece) {
ASSERT_TRUE(migrationSource.hasMoreOplog());