diff options
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 55 |
1 files changed, 50 insertions, 5 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 41968073f89..4d73098c787 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -200,10 +200,12 @@ void CollectionCloner::runQuery() { if (_resumeSupported) { if (_resumeToken) { // Resume the query from where we left off. + LOG(1) << "Collection cloner will resume the last successful query"; query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken" << true << "$_resumeAfter" << _resumeToken.get()); } else { // New attempt at a resumable query. + LOG(1) << "Collection cloner will run a new query"; query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken" << true); } @@ -226,13 +228,35 @@ void CollectionCloner::runQuery() { } catch (...) { auto status = exceptionToStatus(); + // If the collection was dropped at any point, we can just move on to the next cloner. + // This applies to both resumable (4.4) and non-resumable (4.2) queries. + if (status == ErrorCodes::NamespaceNotFound) { + throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit. + } + + // Wire version 4.2 only. if (!_resumeSupported) { - std::string message = str::stream() - << "Collection clone failed and is not resumable. nss: " << _sourceNss; - log() << message; - uasserted(ErrorCodes::InitialSyncFailure, message); + // If we lost our cursor last round, the only time we can can continue is if we find out + // this round that the collection was dropped on the source (that scenario is covered + // right above). If that is not the case, then the cloner would have more work to do, + // but since we cannot resume the query, we must abort initial sync. + if (_lostNonResumableCursor) { + abortNonResumableClone(status); + } + + // Collection has changed upstream. This will trigger the code block above next round, + // (unless we find out the collection was dropped via getting a NamespaceNotFound). + if (_queryStage.isCursorError(status)) { + log() << "Lost cursor during non-resumable query: " << status; + _lostNonResumableCursor = true; + throw; + } + // Any other errors (including network errors, but excluding NamespaceNotFound) result + // in immediate failure. + abortNonResumableClone(status); } + // Re-throw all query errors for resumable (4.4) queries. throw; } } @@ -249,6 +273,14 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { } } + // If this is 'true', it means that something happened to our remote cursor for a reason other + // than the collection being dropped, all while we were running a non-resumable (4.2) clone. + // We must abort initial sync in that case. + if (_lostNonResumableCursor) { + // This will be caught in runQuery(). + uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor"); + } + if (_firstBatchOfQueryRound && _resumeSupported) { // Store the cursorId of the remote cursor. _remoteCursorId = iter.getCursorId(); @@ -354,7 +386,7 @@ void CollectionCloner::killOldQueryCursor() { auto id = _remoteCursorId; auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id)); - + LOG(1) << "Attempting to kill old remote cursor with id: " << id; try { getClient()->runCommand(nss.db().toString(), cmdObj, infoObj); } catch (...) { @@ -365,6 +397,19 @@ void CollectionCloner::killOldQueryCursor() { _remoteCursorId = -1; } +void CollectionCloner::forgetOldQueryCursor() { + _remoteCursorId = -1; +} + +// Throws. +void CollectionCloner::abortNonResumableClone(const Status& status) { + invariant(!_resumeSupported); + log() << "Error during non-resumable clone: " << status; + std::string message = str::stream() + << "Collection clone failed and is not resumable. nss: " << _sourceNss; + uasserted(ErrorCodes::InitialSyncFailure, message); +} + CollectionCloner::Stats CollectionCloner::getStats() const { stdx::lock_guard<Latch> lk(_mutex); return _stats; |