summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-03-06 14:17:57 -0500
committerRandolph Tan <randolph@10gen.com>2019-03-20 14:48:12 -0400
commit2b576d56ab6ac150ce7b1a5b0f592ffdcca105e9 (patch)
tree28535708947abfdfd850fa9150f1c3c170bebc0d
parent1b82c812a9c0bbf6dc79d5400de9ea99e6ffa025 (diff)
downloadmongo-2b576d56ab6ac150ce7b1a5b0f592ffdcca105e9.tar.gz
SERVER-35219 Change the sleep on the destination side into a cond var wait on the donor side of session migration.
(cherry picked from commit 6d774652650dff718a8fa89c2bc845c3b11aa051)
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp19
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h10
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp75
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp7
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp60
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h38
6 files changed, 187 insertions, 22 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 2d3103a8dd7..e02a0ee23e3 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -342,8 +342,14 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
invariant(_state == kCloning);
invariant(!opCtx->lockState()->isLocked());
+
+ if (_sessionCatalogSource) {
+ _sessionCatalogSource->onCommitCloneStarted();
+ }
+
auto responseStatus =
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
+
if (responseStatus.isOK()) {
_cleanup(opCtx);
@@ -363,6 +369,10 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) {
invariant(!opCtx->lockState()->isLocked());
+ if (_sessionCatalogSource) {
+ _sessionCatalogSource->onCloneCleanup();
+ }
+
switch (_state) {
case kDone:
break;
@@ -791,4 +801,13 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra
return boost::make_optional(opTimeToWait);
}
+std::shared_ptr<Notification<bool>>
+MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch() {
+ if (!_sessionCatalogSource) {
+ return nullptr;
+ }
+
+ return _sessionCatalogSource->getNotificationForNewOplog();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index e8a1780c9d2..0e8c53feab1 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -165,6 +165,16 @@ public:
boost::optional<repl::OpTime> nextSessionMigrationBatch(OperationContext* opCtx,
BSONArrayBuilder* arrBuilder);
+ /**
+ * Returns a notification that can be used to wait for new oplog that needs to be migrated.
+ * If the value in the notification returns true, it means that there are no more new batches
+ * that needs to be fetched because the migration has already entered the critical section or
+ * aborted.
+ *
+ * Returns nullptr if there is no session migration associated with this migration.
+ */
+ std::shared_ptr<Notification<bool>> getNotificationForNextSessionMigrationBatch();
+
private:
friend class DeleteNotificationStage;
friend class LogOpForShardingHandler;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 2aea9f1d546..fc89e95f851 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -254,25 +254,37 @@ public:
out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
}
- bool run(OperationContext* opCtx,
- const std::string&,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- const MigrationSessionId migrationSessionId(
- uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+ /**
+ * Fetches the next batch of oplog that needs to be transferred and appends it to the given
+ * array builder. If it was not able to fetch anything, it will return a non-null notification
+ * that will get signalled when new batches comes in or when migration is over. If the boolean
+ * value from the notification returns true, then the migration has entered the critical
+ * section or aborted and there's no more new batches to fetch.
+ */
+ std::shared_ptr<Notification<bool>> fetchNextSessionMigrationBatch(
+ OperationContext* opCtx,
+ const MigrationSessionId& migrationSessionId,
+ BSONArrayBuilder* arrBuilder) {
+ boost::optional<repl::OpTime> opTime;
+ std::shared_ptr<Notification<bool>> newOplogNotification;
- BSONArrayBuilder arrBuilder;
+ writeConflictRetry(
+ opCtx,
+ "Fetching session related oplogs for migration",
+ NamespaceString::kRsOplogNamespace.ns(),
+ [&]() {
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ opTime = autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, arrBuilder);
- boost::optional<repl::OpTime> opTime;
+ if (arrBuilder->arrSize() == 0) {
+ newOplogNotification =
+ autoCloner.getCloner()->getNotificationForNextSessionMigrationBatch();
+ }
+ });
- writeConflictRetry(opCtx,
- "Fetching session related oplogs for migration",
- NamespaceString::kRsOplogNamespace.ns(),
- [&]() {
- AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
- opTime = autoCloner.getCloner()->nextSessionMigrationBatch(
- opCtx, &arrBuilder);
- });
+ if (newOplogNotification) {
+ return newOplogNotification;
+ }
// If the batch returns something, we wait for write concern to ensure that all the entries
// in the batch have been majority committed. We then need to check that the rollback id
@@ -300,7 +312,38 @@ public:
rollbackId == rollbackIdAtMigrationInit);
}
+ return nullptr;
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ BSONArrayBuilder arrBuilder;
+ bool hasMigrationCompleted = false;
+
+ do {
+ if (auto newOplogNotification =
+ fetchNextSessionMigrationBatch(opCtx, migrationSessionId, &arrBuilder)) {
+ hasMigrationCompleted = newOplogNotification->get(opCtx);
+ } else if (arrBuilder.arrSize() == 0) {
+ // If we didn't get a notification and the arrBuilder is empty, that means
+ // that the sessionMigration is not active for this migration (most likely
+ // because it's not a replica set).
+ hasMigrationCompleted = true;
+ }
+ } while (arrBuilder.arrSize() == 0 && !hasMigrationCompleted);
+
result.appendArray("oplog", arrBuilder.arr());
+
+ // TODO: SERVER-40187 remove after v4.2. This is to indicate the caller that this server
+ // waits for notification on new oplog entries to send over so the caller doesn't need
+ // to throttle.
+ result.append("waitsForNewOplog", true);
+
return true;
}
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index e0c91bfbeee..6ae50ee26fb 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -54,6 +54,7 @@ namespace mongo {
namespace {
const auto kOplogField = "oplog";
+const auto kWaitsForNewOplogField = "waitsForNewOplog";
const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Milliseconds(0));
@@ -394,6 +395,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId);
BSONArray oplogArray(nextBatch[kOplogField].Obj());
BSONArrayIteratorSorted oplogIter(oplogArray);
+ const auto donorWaitsForNewOplog = nextBatch[kWaitsForNewOplogField].trueValue();
if (!oplogIter.more()) {
{
@@ -425,7 +427,10 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
}
}
- if (lastOpTimeWaited == lastResult.oplogTime) {
+ // TODO: SERVER-40187 Completely remove after v4.2. donorWaitsForNewOplog is a
+ // 'feature flag' indicating that the donor does block until there are new oplog
+ // to return so we don't need to sleep here.
+ if (!donorWaitsForNewOplog && lastOpTimeWaited == lastResult.oplogTime) {
// We got an empty result at least twice in a row from the source shard so space it
// out a little bit so we don't hammer the shard
opCtx->sleepFor(Milliseconds(200));
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index d41d7481588..7d451b0b81e 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -163,7 +163,32 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
}
bool SessionCatalogMigrationSource::hasMoreOplog() {
- return _hasMoreOplogFromSessionCatalog() || _hasNewWrites();
+ if (_hasMoreOplogFromSessionCatalog()) {
+ return true;
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+ return _hasNewWrites(lk);
+}
+
+void SessionCatalogMigrationSource::onCommitCloneStarted() {
+ stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+
+ _state = State::kCommitStarted;
+ if (_newOplogNotification) {
+ _newOplogNotification->set(true);
+ _newOplogNotification.reset();
+ }
+}
+
+void SessionCatalogMigrationSource::onCloneCleanup() {
+ stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
+
+ _state = State::kCleanup;
+ if (_newOplogNotification) {
+ _newOplogNotification->set(true);
+ _newOplogNotification.reset();
+ }
}
SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() {
@@ -188,6 +213,31 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) {
return _fetchNextNewWriteOplog(opCtx);
}
+std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificationForNewOplog() {
+ invariant(!_hasMoreOplogFromSessionCatalog());
+
+ stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+
+ if (_newOplogNotification) {
+ return _newOplogNotification;
+ }
+
+ auto notification = std::make_shared<Notification<bool>>();
+ if (_state == State::kCleanup) {
+ notification->set(true);
+ }
+ // Even if commit has started, we still need to drain the current buffer.
+ else if (_hasNewWrites(lk)) {
+ notification->set(false);
+ } else if (_state == State::kCommitStarted) {
+ notification->set(true);
+ } else {
+ _newOplogNotification = notification;
+ }
+
+ return notification;
+}
+
bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
if (_currentOplogIterator) {
if (_currentOplogIterator->hasNext()) {
@@ -252,8 +302,7 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
return false;
}
-bool SessionCatalogMigrationSource::_hasNewWrites() {
- stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
+bool SessionCatalogMigrationSource::_hasNewWrites(WithLock) {
return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty();
}
@@ -294,6 +343,11 @@ bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* op
void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
stdx::lock_guard<stdx::mutex> lk(_newOplogMutex);
_newWriteOpTimeList.push_back(opTime);
+
+ if (_newOplogNotification) {
+ _newOplogNotification->set(false);
+ _newOplogNotification.reset();
+ }
}
SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index f50caffb650..148c5c29d60 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -40,6 +40,7 @@
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
@@ -101,6 +102,17 @@ public:
bool fetchNextOplog(OperationContext* opCtx);
/**
+ * Returns a notification that can be used to wait for new oplog entries to fetch. Note
+ * that this should only be called if hasMoreOplog/fetchNextOplog returned false at
+ * least once.
+ *
+ * If the notification is set to true, then that means that there is no longer a need to
+ * fetch more oplog because the data migration has entered the critical section and
+ * the buffer for oplog to fetch is empty or the data migration has aborted.
+ */
+ std::shared_ptr<Notification<bool>> getNotificationForNewOplog();
+
+ /**
* Returns the oplog document that was last fetched by the fetchNextOplog call.
* Returns an empty object if there are no oplog to fetch.
*/
@@ -118,6 +130,18 @@ public:
return _rollbackIdAtInit;
}
+ /**
+ * Inform this session migration machinery that the data migration just entered the critical
+ * section.
+ */
+ void onCommitCloneStarted();
+
+ /**
+ * Inform this session migration machinery that the data migration just terminated and
+ * entering the cleanup phase (can be aborted or committed).
+ */
+ void onCloneCleanup();
+
private:
/**
* An iterator for extracting session write oplogs that need to be cloned during migration.
@@ -149,6 +173,8 @@ private:
std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
};
+ enum class State { kActive, kCommitStarted, kCleanup };
+
///////////////////////////////////////////////////////////////////////////
// Methods for extracting the oplog entries from session information.
@@ -179,7 +205,7 @@ private:
/**
* Returns true if there are oplog generated by new writes that needs to be fetched.
*/
- bool _hasNewWrites();
+ bool _hasNewWrites(WithLock);
/**
* Attempts to fetch the next oplog entry from the new writes that was saved by saveNewWriteTS.
@@ -212,7 +238,7 @@ private:
// Used to store the last fetched oplog. This enables calling get multiple times.
boost::optional<repl::OplogEntry> _lastFetchedOplog;
- // Protects _newWriteTsList, _lastFetchedNewWriteOplog
+ // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
stdx::mutex _newOplogMutex;
// Stores oplog opTime of new writes that are coming in.
@@ -220,6 +246,14 @@ private:
// Used to store the last fetched oplog from _newWriteTsList.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog;
+
+ // Stores the current state.
+ State _state{State::kActive};
+
+ // Holds the latest request for notification of new oplog entries that needs to be fetched.
+ // Sets to true if there is no need to fetch an oplog anymore (for example, because migration
+ // aborted).
+ std::shared_ptr<Notification<bool>> _newOplogNotification;
};
} // namespace mongo