diff options
author | Benety Goh <benety@mongodb.com> | 2017-10-30 16:32:16 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-01-16 15:38:19 -0500 |
commit | 25f0c6ae6c4fa80244cadb8a6bfcbf9bcc8b7742 (patch) | |
tree | 1f0c6d675168d36de29655936e04f07a5c1708d4 | |
parent | 939a30aba1703c939b0352bded4b56257c8a0bec (diff) | |
download | mongo-25f0c6ae6c4fa80244cadb8a6bfcbf9bcc8b7742.tar.gz |
SERVER-29200 migrate SyncTail::getMissingDoc() and fetchAndInsertMissingDoc() to accept OplogEntry instead of BSONObj
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/dbtests/repltests.cpp | 49 |
5 files changed, 77 insertions, 44 deletions
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 5cac46ad48c..08961a2c840 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -674,7 +674,8 @@ void ReplSource::applyOperation(OperationContext* opCtx, Database* db, const BSO // sync source. SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc()); sync.setHostname(hostName); - sync.fetchAndInsertMissingDocument(opCtx, op); + OplogEntry oplogEntry(op); + sync.fetchAndInsertMissingDocument(opCtx, oplogEntry); } } catch (AssertionException& e) { log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op) diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index f25e398443a..cfee963f0f8 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -272,13 +272,13 @@ void ApplyBatchFinalizerForJournal::_run() { } } -NamespaceString parseUUIDOrNs(OperationContext* opCtx, const BSONObj& o) { - auto statusWithUUID = UUID::parse(o.getField("ui")); - if (!statusWithUUID.isOK()) { - return NamespaceString(o.getStringField("ns")); +NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { + auto optionalUuid = oplogEntry.getUuid(); + if (!optionalUuid) { + return oplogEntry.getNamespace(); } - const auto& uuid = statusWithUUID.getValue(); + const auto& uuid = optionalUuid.get(); auto& catalog = UUIDCatalog::get(opCtx); auto nss = catalog.lookupNSSByUUID(uuid); uassert(ErrorCodes::NamespaceNotFound, @@ -1077,7 +1077,7 @@ OldThreadPool* SyncTail::getWriterPool() { return _writerPool.get(); } -BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const BSONObj& o) { +BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) { OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query? if (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) { @@ -1108,26 +1108,25 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const BSONObj& o) { } // get _id from oplog entry to create query to fetch document. - const BSONElement opElem = o.getField("op"); - const bool isUpdate = !opElem.eoo() && opElem.str() == "u"; - const BSONElement idElem = o.getObjectField(isUpdate ? "o2" : "o")["_id"]; + const auto idElem = oplogEntry.getIdElement(); if (idElem.eoo()) { - severe() << "cannot fetch missing document without _id field: " << redact(o); + severe() << "cannot fetch missing document without _id field: " + << redact(oplogEntry.toBSON()); fassertFailedNoTrace(28742); } BSONObj query = BSONObjBuilder().append(idElem).obj(); BSONObj missingObj; - const char* ns = o.getStringField("ns"); + auto nss = oplogEntry.getNamespace(); try { - if (o.getField("ui").eoo()) { - missingObj = missingObjReader.findOne(ns, query); + auto uuid = oplogEntry.getUuid(); + if (!uuid) { + missingObj = missingObjReader.findOne(nss.ns().c_str(), query); } else { - auto uuid = uassertStatusOK(UUID::parse(o.getField("ui"))); - auto dbname = nsToDatabaseSubstring(ns); + auto dbname = nss.db(); // If a UUID exists for the command object, find the document by UUID. - missingObj = missingObjReader.findOneByUUID(dbname.toString(), uuid, query); + missingObj = missingObjReader.findOneByUUID(dbname.toString(), *uuid, query); } } catch (const SocketException&) { warning() << "network problem detected while fetching a missing document from the " @@ -1146,11 +1145,12 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const BSONObj& o) { str::stream() << "Can no longer connect to initial sync source: " << _hostname); } -bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSONObj& o) { +bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, + const OplogEntry& oplogEntry) { // Note that using the local UUID/NamespaceString mapping is sufficient for checking // whether the collection is capped on the remote because convertToCapped creates a // new collection with a different UUID. - const NamespaceString nss(parseUUIDOrNs(opCtx, o)); + const NamespaceString nss(parseUUIDOrNs(opCtx, oplogEntry)); { // If the document is in a capped collection then it's okay for it to be missing. @@ -1162,14 +1162,17 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON } } - log() << "Fetching missing document: " << redact(o); - BSONObj missingObj = getMissingDoc(opCtx, o); + log() << "Fetching missing document: " << redact(oplogEntry.toBSON()); + BSONObj missingObj = getMissingDoc(opCtx, oplogEntry); if (missingObj.isEmpty()) { + BSONObj object2; + if (auto optionalObject2 = oplogEntry.getObject2()) { + object2 = *optionalObject2; + } log() << "Missing document not found on source; presumably deleted later in oplog. o first " "field: " - << o.getObjectField("o").firstElementFieldName() - << ", o2: " << redact(o.getObjectField("o2")); + << redact(oplogEntry.getObject()) << ", o2: " << redact(object2); return false; } @@ -1182,7 +1185,8 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON WriteUnitOfWork wunit(opCtx); Collection* coll = nullptr; - if (o.getField("ui").eoo()) { + auto uuid = oplogEntry.getUuid(); + if (!uuid) { if (!db) { return false; } @@ -1190,9 +1194,8 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON } else { // If the oplog entry has a UUID, use it to find the collection in which to insert the // missing document. - auto uuid = uassertStatusOK(UUID::parse(o.getField("ui"))); auto& catalog = UUIDCatalog::get(opCtx); - coll = catalog.lookupCollectionByUUID(uuid); + coll = catalog.lookupCollectionByUUID(*uuid); if (!coll) { // TODO(SERVER-30819) insert this UUID into the missing UUIDs set. return false; @@ -1447,7 +1450,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx, // We might need to fetch the missing docs from the sync source. fetchCount->fetchAndAdd(1); - st->fetchAndInsertMissingDocument(opCtx, entry.raw); + st->fetchAndInsertMissingDocument(opCtx, entry); } } catch (const DBException& e) { // SERVER-24927 If we have a NamespaceNotFound exception, then this document will be diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index e082f5cb52e..c5c4d83002b 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -204,14 +204,15 @@ public: /** * Fetch a single document referenced in the operation from the sync source. */ - virtual BSONObj getMissingDoc(OperationContext* opCtx, const BSONObj& o); + virtual BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry); /** * If an update fails, fetches the missing document and inserts it into the local collection. * * Returns true if the document was fetched and inserted successfully. */ - virtual bool fetchAndInsertMissingDocument(OperationContext* opCtx, const BSONObj& o); + virtual bool fetchAndInsertMissingDocument(OperationContext* opCtx, + const OplogEntry& oplogEntry); void setHostname(const std::string& hostname); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 3cd5f41d68c..aa2db8e14aa 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -107,7 +107,7 @@ repl::OplogEntry makeOplogEntry(StringData ns) { class SyncTailWithLocalDocumentFetcher : public SyncTail { public: SyncTailWithLocalDocumentFetcher(const BSONObj& document); - BSONObj getMissingDoc(OperationContext* opCtx, const BSONObj& o) override; + BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override; private: BSONObj _document; @@ -119,13 +119,14 @@ private: class SyncTailWithOperationContextChecker : public SyncTail { public: SyncTailWithOperationContextChecker(); - bool fetchAndInsertMissingDocument(OperationContext* opCtx, const BSONObj& o) override; + bool fetchAndInsertMissingDocument(OperationContext* opCtx, + const OplogEntry& oplogEntry) override; }; SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document) : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), _document(document) {} -BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const BSONObj&) { +BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const OplogEntry&) { return _document; } @@ -133,7 +134,7 @@ SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker() : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr) {} bool SyncTailWithOperationContextChecker::fetchAndInsertMissingDocument(OperationContext* opCtx, - const BSONObj&) { + const OplogEntry&) { ASSERT_FALSE(opCtx->writesAreReplicated()); ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); ASSERT_TRUE(documentValidationDisabled(opCtx)); diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 37e04a96a68..c8d076480bf 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -64,6 +64,31 @@ using std::string; using std::stringstream; using std::vector; +/** + * Creates an OplogEntry with given parameters and preset defaults for this test suite. + */ +repl::OplogEntry makeOplogEntry(repl::OpTime opTime, + repl::OpTypeEnum opType, + NamespaceString nss, + BSONObj object, + boost::optional<BSONObj> object2) { + return repl::OplogEntry(opTime, // optime + 0, // hash + opType, // opType + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + {}, // sessionInfo + boost::none, // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime +} + BSONObj f(const char* s) { return fromjson(s); } @@ -1324,7 +1349,7 @@ public: bool returnEmpty; SyncTest() : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc()), returnEmpty(false) {} virtual ~SyncTest() {} - virtual BSONObj getMissingDoc(OperationContext* opCtx, const BSONObj& o) { + BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override { if (returnEmpty) { BSONObj o; return o; @@ -1340,13 +1365,15 @@ class FetchAndInsertMissingDocument : public Base { public: void run() { bool threw = false; - BSONObj o = BSON("ns" << ns() << "o" << BSON("foo" - << "bar") - << "o2" - << BSON("_id" - << "in oplog" - << "foo" - << "bar")); + auto oplogEntry = makeOplogEntry(OpTime(Timestamp(100, 1), 1LL), // optime + OpTypeEnum::kUpdate, // op type + NamespaceString(ns()), // namespace + BSON("foo" + << "bar"), // o + BSON("_id" + << "in oplog" + << "foo" + << "bar")); // o2 Lock::GlobalWrite lk(&_opCtx); @@ -1356,7 +1383,7 @@ public: badSource.setHostname("localhost:123"); OldClientContext ctx(&_opCtx, ns()); - badSource.getMissingDoc(&_opCtx, o); + badSource.getMissingDoc(&_opCtx, oplogEntry); } catch (DBException&) { threw = true; } @@ -1364,7 +1391,7 @@ public: // now this should succeed SyncTest t; - verify(t.fetchAndInsertMissingDocument(&_opCtx, o)); + verify(t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry)); verify(!_client .findOne(ns(), BSON("_id" @@ -1373,7 +1400,7 @@ public: // force it not to find an obj t.returnEmpty = true; - verify(!t.fetchAndInsertMissingDocument(&_opCtx, o)); + verify(!t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry)); } }; |