summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp55
1 files changed, 50 insertions, 5 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 41968073f89..4d73098c787 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -200,10 +200,12 @@ void CollectionCloner::runQuery() {
if (_resumeSupported) {
if (_resumeToken) {
// Resume the query from where we left off.
+ LOG(1) << "Collection cloner will resume the last successful query";
query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken"
<< true << "$_resumeAfter" << _resumeToken.get());
} else {
// New attempt at a resumable query.
+ LOG(1) << "Collection cloner will run a new query";
query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken"
<< true);
}
@@ -226,13 +228,35 @@ void CollectionCloner::runQuery() {
} catch (...) {
auto status = exceptionToStatus();
+ // If the collection was dropped at any point, we can just move on to the next cloner.
+ // This applies to both resumable (4.4) and non-resumable (4.2) queries.
+ if (status == ErrorCodes::NamespaceNotFound) {
+ throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit.
+ }
+
+ // Wire version 4.2 only.
if (!_resumeSupported) {
- std::string message = str::stream()
- << "Collection clone failed and is not resumable. nss: " << _sourceNss;
- log() << message;
- uasserted(ErrorCodes::InitialSyncFailure, message);
+ // If we lost our cursor last round, the only time we can can continue is if we find out
+ // this round that the collection was dropped on the source (that scenario is covered
+ // right above). If that is not the case, then the cloner would have more work to do,
+ // but since we cannot resume the query, we must abort initial sync.
+ if (_lostNonResumableCursor) {
+ abortNonResumableClone(status);
+ }
+
+ // Collection has changed upstream. This will trigger the code block above next round,
+ // (unless we find out the collection was dropped via getting a NamespaceNotFound).
+ if (_queryStage.isCursorError(status)) {
+ log() << "Lost cursor during non-resumable query: " << status;
+ _lostNonResumableCursor = true;
+ throw;
+ }
+ // Any other errors (including network errors, but excluding NamespaceNotFound) result
+ // in immediate failure.
+ abortNonResumableClone(status);
}
+ // Re-throw all query errors for resumable (4.4) queries.
throw;
}
}
@@ -249,6 +273,14 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
}
}
+ // If this is 'true', it means that something happened to our remote cursor for a reason other
+ // than the collection being dropped, all while we were running a non-resumable (4.2) clone.
+ // We must abort initial sync in that case.
+ if (_lostNonResumableCursor) {
+ // This will be caught in runQuery().
+ uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor");
+ }
+
if (_firstBatchOfQueryRound && _resumeSupported) {
// Store the cursorId of the remote cursor.
_remoteCursorId = iter.getCursorId();
@@ -354,7 +386,7 @@ void CollectionCloner::killOldQueryCursor() {
auto id = _remoteCursorId;
auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id));
-
+ LOG(1) << "Attempting to kill old remote cursor with id: " << id;
try {
getClient()->runCommand(nss.db().toString(), cmdObj, infoObj);
} catch (...) {
@@ -365,6 +397,19 @@ void CollectionCloner::killOldQueryCursor() {
_remoteCursorId = -1;
}
+void CollectionCloner::forgetOldQueryCursor() {
+ _remoteCursorId = -1;
+}
+
+// Throws.
+void CollectionCloner::abortNonResumableClone(const Status& status) {
+ invariant(!_resumeSupported);
+ log() << "Error during non-resumable clone: " << status;
+ std::string message = str::stream()
+ << "Collection clone failed and is not resumable. nss: " << _sourceNss;
+ uasserted(ErrorCodes::InitialSyncFailure, message);
+}
+
CollectionCloner::Stats CollectionCloner::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
return _stats;