diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 94 |
1 files changed, 44 insertions, 50 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index edb82cc82a4..8c0c0229508 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -935,17 +935,10 @@ OldThreadPool* SyncTail::getWriterPool() { return _writerPool.get(); } -BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o) { +BSONObj SyncTail::getMissingDoc(OperationContext* txn, const BSONObj& o) { OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query? const char* ns = o.getStringField("ns"); - // capped collections - Collection* collection = db->getCollection(ns); - if (collection && collection->isCapped()) { - log() << "missing doc, but this is okay for a capped collection (" << ns << ")"; - return BSONObj(); - } - if (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) { log() << "initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled. " "Blocking until fail point is disabled."; @@ -1004,43 +997,52 @@ BSONObj SyncTail::getMissingDoc(OperationContext* txn, Database* db, const BSONO str::stream() << "Can no longer connect to initial sync source: " << _hostname); } -bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) { +bool SyncTail::fetchAndInsertMissingDocument(OperationContext* txn, const BSONObj& o) { const NamespaceString nss(o.getStringField("ns")); + + { + // If the document is in a capped collection then it's okay for it to be missing. + AutoGetCollectionForRead autoColl(txn, nss); + Collection* const collection = autoColl.getCollection(); + if (collection && collection->isCapped()) { + log() << "Not fetching missing document in capped collection (" << nss << ")"; + return false; + } + } + + log() << "Fetching missing document: " << redact(o); + BSONObj missingObj = getMissingDoc(txn, o); + + if (missingObj.isEmpty()) { + log() << "Missing document not found on source; presumably deleted later in oplog. o first " + "field: " + << o.getObjectField("o").firstElementFieldName() + << ", o2: " << redact(o.getObjectField("o2")); + + return false; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { // Take an X lock on the database in order to preclude other modifications. // Also, the database might not exist yet, so create it. AutoGetOrCreateDb autoDb(txn, nss.db(), MODE_X); Database* const db = autoDb.getDb(); - // we don't have the object yet, which is possible on initial sync. get it. - log() << "adding missing object" << endl; // rare enough we can log + WriteUnitOfWork wunit(txn); - BSONObj missingObj = getMissingDoc(txn, db, o); + Collection* const coll = db->getOrCreateCollection(txn, nss.toString()); + invariant(coll); - if (missingObj.isEmpty()) { - log() << "missing object not found on source." - " presumably deleted later in oplog"; - log() << "o2: " << redact(o.getObjectField("o2")); - log() << "o firstfield: " << o.getObjectField("o").firstElementFieldName(); + OpDebug* const nullOpDebug = nullptr; + Status status = coll->insertDocument(txn, missingObj, nullOpDebug, true); + uassert(15917, + str::stream() << "Failed to insert missing document: " << status.toString(), + status.isOK()); - return false; - } else { - WriteUnitOfWork wunit(txn); - - Collection* const coll = db->getOrCreateCollection(txn, nss.toString()); - invariant(coll); + LOG(1) << "Inserted missing document: " << redact(missingObj); - OpDebug* const nullOpDebug = nullptr; - Status status = coll->insertDocument(txn, missingObj, nullOpDebug, true); - uassert(15917, - str::stream() << "failed to insert missing doc: " << status.toString(), - status.isOK()); - - LOG(1) << "inserted missing doc: " << redact(missingObj); - - wunit.commit(); - return true; - } + wunit.commit(); + return true; } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "InsertRetry", nss.ns()); @@ -1196,27 +1198,19 @@ Status multiInitialSyncApply_noAbort(OperationContext* txn, try { const Status s = SyncTail::syncApply(txn, entry.raw, inSteadyStateReplication); if (!s.isOK()) { - // Don't retry on commands. - if (entry.isCommand()) { - error() << "Error applying command (" << redact(entry.raw) - << "): " << redact(s); + // In initial sync, update operations can cause documents to be missed during + // collection cloning. As a result, it is possible that a document that we need to + // update is not present locally. In that case we fetch the document from the + // sync source. + if (s != ErrorCodes::UpdateOperationFailed) { + error() << "Error applying operation: " << redact(s) << " (" + << redact(entry.raw) << ")"; return s; } // We might need to fetch the missing docs from the sync source. fetchCount->fetchAndAdd(1); - if (st->shouldRetry(txn, entry.raw)) { - const Status s2 = SyncTail::syncApply(txn, entry.raw, inSteadyStateReplication); - if (!s2.isOK()) { - severe() << "Error applying operation (" << redact(entry.raw) - << "): " << redact(s2); - return s2; - } - } - - // If shouldRetry() returns false, fall through. - // This can happen if the document that was moved and missed by Cloner - // subsequently got deleted and no longer exists on the Sync Target at all + st->fetchAndInsertMissingDocument(txn, entry.raw); } } catch (const DBException& e) { // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be |