diff options
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 50 |
4 files changed, 88 insertions, 71 deletions
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 46cba75c41c..7ec2db33ab1 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -44,54 +44,57 @@ OplogEntry::OplogEntry(BSONObj rawInput) : raw(std::move(rawInput)) { raw = raw.copy(); } + BSONElement version; for (auto elem : raw) { const auto name = elem.fieldNameStringData(); if (name == "ns") { - ns = elem.valuestrsafe(); + _ns = NamespaceString(elem.valuestrsafe()); } else if (name == "op") { - opType = elem.valuestrsafe(); + _opType = elem.valuestrsafe(); } else if (name == "o2") { - o2 = elem; + _o2 = elem.Obj(); } else if (name == "ts") { - ts = elem; + _ts = elem.timestamp(); } else if (name == "v") { version = elem; } else if (name == "o") { - o = elem; + _o = elem.Obj(); } } + + _version = version.eoo() ? 1 : version.numberInt(); } bool OplogEntry::isCommand() const { - return opType[0] == 'c'; + return getOpType()[0] == 'c'; } bool OplogEntry::isCrudOpType() const { - switch (opType[0]) { + switch (getOpType()[0]) { case 'd': case 'i': case 'u': - return opType[1] == 0; + return getOpType()[1] == 0; } return false; } bool OplogEntry::hasNamespace() const { - return !ns.empty(); + return !getNamespace().isEmpty(); } int OplogEntry::getVersion() const { - return version.eoo() ? 1 : version.Int(); + return _version; } BSONElement OplogEntry::getIdElement() const { invariant(isCrudOpType()); - switch (opType[0]) { + switch (getOpType()[0]) { case 'u': - return o2.Obj()["_id"]; + return getObject2()["_id"]; case 'd': case 'i': - return o.Obj()["_id"]; + return getObject()["_id"]; } MONGO_UNREACHABLE; } @@ -100,12 +103,8 @@ OpTime OplogEntry::getOpTime() const { return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw)); } -Seconds OplogEntry::getTimestampSecs() const { - return Seconds(ts.timestamp().getSecs()); -} - StringData OplogEntry::getCollectionName() const { - return nsToCollectionSubstring(ns); + return getNamespace().coll(); } std::string OplogEntry::toString() const { diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index f7c59d32c69..4e7b3d67839 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -30,6 +30,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/simple_bsonobj_comparator.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" namespace mongo { @@ -65,13 +66,34 @@ struct OplogEntry { BSONObj raw; // Owned. - StringData ns = ""; - StringData opType = ""; + // TODO: Remove these when we add the IDL. + const NamespaceString& getNamespace() const { + return _ns; + } - BSONElement version; - BSONElement o; - BSONElement o2; - BSONElement ts; + StringData getOpType() const { + return _opType; + } + + Timestamp getTimestamp() const { + return _ts; + } + + const BSONObj& getObject() const { + return _o; + } + + const BSONObj& getObject2() const { + return _o2; + } + +private: + NamespaceString _ns; + StringData _opType = ""; + int _version; + Timestamp _ts; + BSONObj _o; + BSONObj _o2; }; std::ostream& operator<<(std::ostream& s, const OplogEntry& o); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index acfe9915aa6..54364c9c839 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -583,7 +583,7 @@ void fillWriterVectors(OperationContext* opCtx, CachedCollectionProperties collPropertiesCache; for (auto&& op : *ops) { - StringMapTraits::HashedKey hashedNs(op.ns); + StringMapTraits::HashedKey hashedNs(op.getNamespace().ns()); uint32_t hash = hashedNs.hash(); if (op.isCrudOpType()) { @@ -602,7 +602,7 @@ void fillWriterVectors(OperationContext* opCtx, MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } - if (op.opType == "i" && collProperties.isCapped) { + if (op.getOpType() == "i" && collProperties.isCapped) { // Mark capped collection ops before storing them to ensure we do not attempt to // bulk insert them. op.isForCappedCollection = true; @@ -871,14 +871,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, if (!entry.raw.isEmpty()) { // check for oplog version change - int curVersion = 0; - if (entry.version.eoo()) { - // missing version means version 1 - curVersion = 1; - } else { - curVersion = entry.version.Int(); - } - + int curVersion = entry.getVersion(); if (curVersion != OplogEntry::kOplogVersion) { severe() << "expected oplog version " << OplogEntry::kOplogVersion << " but found version " << curVersion @@ -887,8 +880,8 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, } } - if (limits.slaveDelayLatestTimestamp && - entry.ts.timestampTime() > *limits.slaveDelayLatestTimestamp) { + auto entryTime = Date_t::fromDurationSinceEpoch(Seconds(entry.getTimestamp().getSecs())); + if (limits.slaveDelayLatestTimestamp && entryTime > *limits.slaveDelayLatestTimestamp) { ops->pop_back(); // Don't do this op yet. if (ops->empty()) { @@ -900,11 +893,11 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, } // Check for ops that must be processed one at a time. - if (entry.raw.isEmpty() || // sentinel that network queue is drained. - (entry.opType[0] == 'c') || // commands. + if (entry.raw.isEmpty() || // sentinel that network queue is drained. + (entry.getOpType()[0] == 'c') || // commands. // Index builds are achieved through the use of an insert op, not a command op. // The following line is the same as what the insert code uses to detect an index build. - (!entry.ns.empty() && nsToCollectionSubstring(entry.ns) == "system.indexes")) { + (!entry.getNamespace().isEmpty() && entry.getNamespace().coll() == "system.indexes")) { if (ops->getCount() == 1) { // apply commands one-at-a-time _networkQueue->consume(opCtx); @@ -1070,7 +1063,9 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, if (oplogEntryPointers->size() > 1) { std::stable_sort(oplogEntryPointers->begin(), oplogEntryPointers->end(), - [](const OplogEntry* l, const OplogEntry* r) { return l->ns < r->ns; }); + [](const OplogEntry* l, const OplogEntry* r) { + return l->getNamespace() < r->getNamespace(); + }); } // This function is only called in steady state replication. @@ -1084,7 +1079,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, oplogEntriesIterator != oplogEntryPointers->end(); ++oplogEntriesIterator) { auto entry = *oplogEntriesIterator; - if (entry->opType[0] == 'i' && !entry->isForCappedCollection && + if (entry->getOpType()[0] == 'i' && !entry->isForCappedCollection && oplogEntriesIterator > doNotGroupBeforePoint) { // Attempt to group inserts if possible. std::vector<BSONObj> toInsert; @@ -1094,10 +1089,11 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, oplogEntriesIterator + 1, oplogEntryPointers->end(), [&](const OplogEntry* nextEntry) { - return nextEntry->opType[0] != 'i' || // Must be an insert. - nextEntry->ns != entry->ns || // Must be the same namespace. + return nextEntry->getOpType()[0] != 'i' || // Must be an insert. + nextEntry->getNamespace() != + entry->getNamespace() || // Must be the same namespace. // Must not create too large an object. - (batchSize += nextEntry->o.Obj().objsize()) > insertVectorMaxBytes || + (batchSize += nextEntry->getObject().objsize()) > insertVectorMaxBytes || ++batchCount >= 64; // Or have too many entries. }); @@ -1117,7 +1113,7 @@ Status multiSyncApply_noAbort(OperationContext* opCtx, for (auto groupingIterator = oplogEntriesIterator; groupingIterator != endOfGroupableOpsIterator; ++groupingIterator) { - insertArrayBuilder.append((*groupingIterator)->o.Obj()); + insertArrayBuilder.append((*groupingIterator)->getObject()); } insertArrayBuilder.done(); @@ -1280,7 +1276,7 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads()); ON_BLOCK_EXIT([&] { workerPool->join(); }); - storage->setOplogDeleteFromPoint(opCtx, ops.front().ts.timestamp()); + storage->setOplogDeleteFromPoint(opCtx, ops.front().getTimestamp()); scheduleWritesToOplog(opCtx, workerPool, ops); fillWriterVectors(opCtx, &ops, &writerVectors); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index b9853a776e5..9a83faa52d4 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -718,9 +718,9 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin auto insertOp1b = makeOp(nss1); auto insertOp2a = makeOp(nss2); auto insertOp2b = makeOp(nss2); - MultiApplier::Operations operationsApplied; + std::vector<BSONObj> operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(OplogEntry(op)); + operationsApplied.push_back(op.copy()); return Status::OK(); }; @@ -729,26 +729,26 @@ TEST_F(SyncTailTest, MultiSyncApplyGroupsInsertOperationByNamespaceBeforeApplyin ASSERT_OK(multiSyncApply_noAbort(_opCtx.get(), &ops, syncApply)); ASSERT_EQUALS(4U, operationsApplied.size()); - ASSERT_EQUALS(createOp1, operationsApplied[0]); - ASSERT_EQUALS(createOp2, operationsApplied[1]); + ASSERT_BSONOBJ_EQ(createOp1.raw, operationsApplied[0]); + ASSERT_BSONOBJ_EQ(createOp2.raw, operationsApplied[1]); // Check grouped insert operations in namespace "nss1". - ASSERT_EQUALS(insertOp1a.getOpTime(), operationsApplied[2].getOpTime()); - ASSERT_EQUALS(insertOp1a.ns, operationsApplied[2].ns); - ASSERT_EQUALS(BSONType::Array, operationsApplied[2].o.type()); - auto group1 = operationsApplied[2].o.Array(); + ASSERT_EQUALS(insertOp1a.getOpTime(), OpTime::parseFromOplogEntry(operationsApplied[2])); + ASSERT_EQUALS(insertOp1a.getNamespace().ns(), operationsApplied[2]["ns"].valuestrsafe()); + ASSERT_EQUALS(BSONType::Array, operationsApplied[2]["o"].type()); + auto group1 = operationsApplied[2]["o"].Array(); ASSERT_EQUALS(2U, group1.size()); - ASSERT_BSONOBJ_EQ(insertOp1a.o.Obj(), group1[0].Obj()); - ASSERT_BSONOBJ_EQ(insertOp1b.o.Obj(), group1[1].Obj()); + ASSERT_BSONOBJ_EQ(insertOp1a.getObject(), group1[0].Obj()); + ASSERT_BSONOBJ_EQ(insertOp1b.getObject(), group1[1].Obj()); // Check grouped insert operations in namespace "nss2". - ASSERT_EQUALS(insertOp2a.getOpTime(), operationsApplied[3].getOpTime()); - ASSERT_EQUALS(insertOp2a.ns, operationsApplied[3].ns); - ASSERT_EQUALS(BSONType::Array, operationsApplied[3].o.type()); - auto group2 = operationsApplied[3].o.Array(); + ASSERT_EQUALS(insertOp2a.getOpTime(), OpTime::parseFromOplogEntry(operationsApplied[3])); + ASSERT_EQUALS(insertOp2a.getNamespace().ns(), operationsApplied[3]["ns"].valuestrsafe()); + ASSERT_EQUALS(BSONType::Array, operationsApplied[3]["o"].type()); + auto group2 = operationsApplied[3]["o"].Array(); ASSERT_EQUALS(2U, group2.size()); - ASSERT_BSONOBJ_EQ(insertOp2a.o.Obj(), group2[0].Obj()); - ASSERT_BSONOBJ_EQ(insertOp2b.o.Obj(), group2[1].Obj()); + ASSERT_BSONOBJ_EQ(insertOp2a.getObject(), group2[0].Obj()); + ASSERT_BSONOBJ_EQ(insertOp2b.getObject(), group2[1].Obj()); } TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { @@ -770,9 +770,9 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { MultiApplier::Operations operationsToApply; operationsToApply.push_back(createOp); std::copy(insertOps.begin(), insertOps.end(), std::back_inserter(operationsToApply)); - MultiApplier::Operations operationsApplied; + std::vector<BSONObj> operationsApplied; auto syncApply = [&operationsApplied](OperationContext*, const BSONObj& op, bool) { - operationsApplied.push_back(OplogEntry(op)); + operationsApplied.push_back(op.copy()); return Status::OK(); }; @@ -785,21 +785,21 @@ TEST_F(SyncTailTest, MultiSyncApplyUsesLimitWhenGroupingInsertOperation) { // multiSyncApply should combine operations as follows: // {create}, {grouped_insert}, {insert_(limit+1)} ASSERT_EQUALS(3U, operationsApplied.size()); - ASSERT_EQUALS(createOp, operationsApplied[0]); + ASSERT_BSONOBJ_EQ(createOp.raw, operationsApplied[0]); const auto& groupedInsertOp = operationsApplied[1]; - ASSERT_EQUALS(insertOps.front().getOpTime(), groupedInsertOp.getOpTime()); - ASSERT_EQUALS(insertOps.front().ns, groupedInsertOp.ns); - ASSERT_EQUALS(BSONType::Array, groupedInsertOp.o.type()); - auto groupedInsertDocuments = groupedInsertOp.o.Array(); + ASSERT_EQUALS(insertOps.front().getOpTime(), OpTime::parseFromOplogEntry(groupedInsertOp)); + ASSERT_EQUALS(insertOps.front().getNamespace().ns(), groupedInsertOp["ns"].valuestrsafe()); + ASSERT_EQUALS(BSONType::Array, groupedInsertOp["o"].type()); + auto groupedInsertDocuments = groupedInsertOp["o"].Array(); ASSERT_EQUALS(limit, groupedInsertDocuments.size()); for (std::size_t i = 0; i < limit; ++i) { const auto& insertOp = insertOps[i]; - ASSERT_BSONOBJ_EQ(insertOp.o.Obj(), groupedInsertDocuments[i].Obj()); + ASSERT_BSONOBJ_EQ(insertOp.getObject(), groupedInsertDocuments[i].Obj()); } // (limit + 1)-th insert operations should not be included in group of first (limit) inserts. - ASSERT_EQUALS(insertOps.back(), operationsApplied[2]); + ASSERT_BSONOBJ_EQ(insertOps.back().raw, operationsApplied[2]); } TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGroupedInsertFails) { |