summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-10-30 16:32:16 -0400
committerBenety Goh <benety@mongodb.com>2018-01-16 15:38:19 -0500
commit25f0c6ae6c4fa80244cadb8a6bfcbf9bcc8b7742 (patch)
tree1f0c6d675168d36de29655936e04f07a5c1708d4
parent939a30aba1703c939b0352bded4b56257c8a0bec (diff)
downloadmongo-25f0c6ae6c4fa80244cadb8a6bfcbf9bcc8b7742.tar.gz
SERVER-29200 migrate SyncTail::getMissingDoc() and fetchAndInsertMissingDoc() to accept OplogEntry instead of BSONObj
-rw-r--r--src/mongo/db/repl/master_slave.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp55
-rw-r--r--src/mongo/db/repl/sync_tail.h5
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp9
-rw-r--r--src/mongo/dbtests/repltests.cpp49
5 files changed, 77 insertions, 44 deletions
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 5cac46ad48c..08961a2c840 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -674,7 +674,8 @@ void ReplSource::applyOperation(OperationContext* opCtx, Database* db, const BSO
// sync source.
SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc());
sync.setHostname(hostName);
- sync.fetchAndInsertMissingDocument(opCtx, op);
+ OplogEntry oplogEntry(op);
+ sync.fetchAndInsertMissingDocument(opCtx, oplogEntry);
}
} catch (AssertionException& e) {
log() << "sync: caught user assertion " << redact(e) << " while applying op: " << redact(op)
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index f25e398443a..cfee963f0f8 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -272,13 +272,13 @@ void ApplyBatchFinalizerForJournal::_run() {
}
}
-NamespaceString parseUUIDOrNs(OperationContext* opCtx, const BSONObj& o) {
- auto statusWithUUID = UUID::parse(o.getField("ui"));
- if (!statusWithUUID.isOK()) {
- return NamespaceString(o.getStringField("ns"));
+NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) {
+ auto optionalUuid = oplogEntry.getUuid();
+ if (!optionalUuid) {
+ return oplogEntry.getNamespace();
}
- const auto& uuid = statusWithUUID.getValue();
+ const auto& uuid = optionalUuid.get();
auto& catalog = UUIDCatalog::get(opCtx);
auto nss = catalog.lookupNSSByUUID(uuid);
uassert(ErrorCodes::NamespaceNotFound,
@@ -1077,7 +1077,7 @@ OldThreadPool* SyncTail::getWriterPool() {
return _writerPool.get();
}
-BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const BSONObj& o) {
+BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) {
OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query?
if (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) {
@@ -1108,26 +1108,25 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const BSONObj& o) {
}
// get _id from oplog entry to create query to fetch document.
- const BSONElement opElem = o.getField("op");
- const bool isUpdate = !opElem.eoo() && opElem.str() == "u";
- const BSONElement idElem = o.getObjectField(isUpdate ? "o2" : "o")["_id"];
+ const auto idElem = oplogEntry.getIdElement();
if (idElem.eoo()) {
- severe() << "cannot fetch missing document without _id field: " << redact(o);
+ severe() << "cannot fetch missing document without _id field: "
+ << redact(oplogEntry.toBSON());
fassertFailedNoTrace(28742);
}
BSONObj query = BSONObjBuilder().append(idElem).obj();
BSONObj missingObj;
- const char* ns = o.getStringField("ns");
+ auto nss = oplogEntry.getNamespace();
try {
- if (o.getField("ui").eoo()) {
- missingObj = missingObjReader.findOne(ns, query);
+ auto uuid = oplogEntry.getUuid();
+ if (!uuid) {
+ missingObj = missingObjReader.findOne(nss.ns().c_str(), query);
} else {
- auto uuid = uassertStatusOK(UUID::parse(o.getField("ui")));
- auto dbname = nsToDatabaseSubstring(ns);
+ auto dbname = nss.db();
// If a UUID exists for the command object, find the document by UUID.
- missingObj = missingObjReader.findOneByUUID(dbname.toString(), uuid, query);
+ missingObj = missingObjReader.findOneByUUID(dbname.toString(), *uuid, query);
}
} catch (const SocketException&) {
warning() << "network problem detected while fetching a missing document from the "
@@ -1146,11 +1145,12 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const BSONObj& o) {
str::stream() << "Can no longer connect to initial sync source: " << _hostname);
}
-bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSONObj& o) {
+bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx,
+ const OplogEntry& oplogEntry) {
// Note that using the local UUID/NamespaceString mapping is sufficient for checking
// whether the collection is capped on the remote because convertToCapped creates a
// new collection with a different UUID.
- const NamespaceString nss(parseUUIDOrNs(opCtx, o));
+ const NamespaceString nss(parseUUIDOrNs(opCtx, oplogEntry));
{
// If the document is in a capped collection then it's okay for it to be missing.
@@ -1162,14 +1162,17 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON
}
}
- log() << "Fetching missing document: " << redact(o);
- BSONObj missingObj = getMissingDoc(opCtx, o);
+ log() << "Fetching missing document: " << redact(oplogEntry.toBSON());
+ BSONObj missingObj = getMissingDoc(opCtx, oplogEntry);
if (missingObj.isEmpty()) {
+ BSONObj object2;
+ if (auto optionalObject2 = oplogEntry.getObject2()) {
+ object2 = *optionalObject2;
+ }
log() << "Missing document not found on source; presumably deleted later in oplog. o first "
"field: "
- << o.getObjectField("o").firstElementFieldName()
- << ", o2: " << redact(o.getObjectField("o2"));
+ << redact(oplogEntry.getObject()) << ", o2: " << redact(object2);
return false;
}
@@ -1182,7 +1185,8 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON
WriteUnitOfWork wunit(opCtx);
Collection* coll = nullptr;
- if (o.getField("ui").eoo()) {
+ auto uuid = oplogEntry.getUuid();
+ if (!uuid) {
if (!db) {
return false;
}
@@ -1190,9 +1194,8 @@ bool SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, const BSON
} else {
// If the oplog entry has a UUID, use it to find the collection in which to insert the
// missing document.
- auto uuid = uassertStatusOK(UUID::parse(o.getField("ui")));
auto& catalog = UUIDCatalog::get(opCtx);
- coll = catalog.lookupCollectionByUUID(uuid);
+ coll = catalog.lookupCollectionByUUID(*uuid);
if (!coll) {
// TODO(SERVER-30819) insert this UUID into the missing UUIDs set.
return false;
@@ -1447,7 +1450,7 @@ Status multiInitialSyncApply_noAbort(OperationContext* opCtx,
// We might need to fetch the missing docs from the sync source.
fetchCount->fetchAndAdd(1);
- st->fetchAndInsertMissingDocument(opCtx, entry.raw);
+ st->fetchAndInsertMissingDocument(opCtx, entry);
}
} catch (const DBException& e) {
// SERVER-24927 If we have a NamespaceNotFound exception, then this document will be
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index e082f5cb52e..c5c4d83002b 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -204,14 +204,15 @@ public:
/**
* Fetch a single document referenced in the operation from the sync source.
*/
- virtual BSONObj getMissingDoc(OperationContext* opCtx, const BSONObj& o);
+ virtual BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry);
/**
* If an update fails, fetches the missing document and inserts it into the local collection.
*
* Returns true if the document was fetched and inserted successfully.
*/
- virtual bool fetchAndInsertMissingDocument(OperationContext* opCtx, const BSONObj& o);
+ virtual bool fetchAndInsertMissingDocument(OperationContext* opCtx,
+ const OplogEntry& oplogEntry);
void setHostname(const std::string& hostname);
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 3cd5f41d68c..aa2db8e14aa 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -107,7 +107,7 @@ repl::OplogEntry makeOplogEntry(StringData ns) {
class SyncTailWithLocalDocumentFetcher : public SyncTail {
public:
SyncTailWithLocalDocumentFetcher(const BSONObj& document);
- BSONObj getMissingDoc(OperationContext* opCtx, const BSONObj& o) override;
+ BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override;
private:
BSONObj _document;
@@ -119,13 +119,14 @@ private:
class SyncTailWithOperationContextChecker : public SyncTail {
public:
SyncTailWithOperationContextChecker();
- bool fetchAndInsertMissingDocument(OperationContext* opCtx, const BSONObj& o) override;
+ bool fetchAndInsertMissingDocument(OperationContext* opCtx,
+ const OplogEntry& oplogEntry) override;
};
SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document)
: SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr), _document(document) {}
-BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const BSONObj&) {
+BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const OplogEntry&) {
return _document;
}
@@ -133,7 +134,7 @@ SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker()
: SyncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr) {}
bool SyncTailWithOperationContextChecker::fetchAndInsertMissingDocument(OperationContext* opCtx,
- const BSONObj&) {
+ const OplogEntry&) {
ASSERT_FALSE(opCtx->writesAreReplicated());
ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
ASSERT_TRUE(documentValidationDisabled(opCtx));
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index 37e04a96a68..c8d076480bf 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -64,6 +64,31 @@ using std::string;
using std::stringstream;
using std::vector;
+/**
+ * Creates an OplogEntry with given parameters and preset defaults for this test suite.
+ */
+repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
+ repl::OpTypeEnum opType,
+ NamespaceString nss,
+ BSONObj object,
+ boost::optional<BSONObj> object2) {
+ return repl::OplogEntry(opTime, // optime
+ 0, // hash
+ opType, // opType
+ nss, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ OplogEntry::kOplogVersion, // version
+ object, // o
+ object2, // o2
+ {}, // sessionInfo
+ boost::none, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+}
+
BSONObj f(const char* s) {
return fromjson(s);
}
@@ -1324,7 +1349,7 @@ public:
bool returnEmpty;
SyncTest() : SyncTail(nullptr, SyncTail::MultiSyncApplyFunc()), returnEmpty(false) {}
virtual ~SyncTest() {}
- virtual BSONObj getMissingDoc(OperationContext* opCtx, const BSONObj& o) {
+ BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override {
if (returnEmpty) {
BSONObj o;
return o;
@@ -1340,13 +1365,15 @@ class FetchAndInsertMissingDocument : public Base {
public:
void run() {
bool threw = false;
- BSONObj o = BSON("ns" << ns() << "o" << BSON("foo"
- << "bar")
- << "o2"
- << BSON("_id"
- << "in oplog"
- << "foo"
- << "bar"));
+ auto oplogEntry = makeOplogEntry(OpTime(Timestamp(100, 1), 1LL), // optime
+ OpTypeEnum::kUpdate, // op type
+ NamespaceString(ns()), // namespace
+ BSON("foo"
+ << "bar"), // o
+ BSON("_id"
+ << "in oplog"
+ << "foo"
+ << "bar")); // o2
Lock::GlobalWrite lk(&_opCtx);
@@ -1356,7 +1383,7 @@ public:
badSource.setHostname("localhost:123");
OldClientContext ctx(&_opCtx, ns());
- badSource.getMissingDoc(&_opCtx, o);
+ badSource.getMissingDoc(&_opCtx, oplogEntry);
} catch (DBException&) {
threw = true;
}
@@ -1364,7 +1391,7 @@ public:
// now this should succeed
SyncTest t;
- verify(t.fetchAndInsertMissingDocument(&_opCtx, o));
+ verify(t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry));
verify(!_client
.findOne(ns(),
BSON("_id"
@@ -1373,7 +1400,7 @@ public:
// force it not to find an obj
t.returnEmpty = true;
- verify(!t.fetchAndInsertMissingDocument(&_opCtx, o));
+ verify(!t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry));
}
};