diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-19 21:55:52 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-10-21 19:27:12 -0400 |
commit | 9700b08a0cfaafaca6bac9efa5f188b0acf0f464 (patch) | |
tree | 57c01e361e7bd63093e4e8854dea8c914064994e | |
parent | 864ac6c37abe024239beba37bf319438631487ae (diff) | |
download | mongo-9700b08a0cfaafaca6bac9efa5f188b0acf0f464.tar.gz |
SERVER-26666 OplogBufferCollection saves sentinels in collection
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection_test.cpp | 402 |
3 files changed, 137 insertions, 303 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index 1e3df806934..bf7ca8bdd73 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -99,14 +99,6 @@ void OplogBufferCollection::shutdown(OperationContext* txn) { } void OplogBufferCollection::pushEvenIfFull(OperationContext* txn, const Value& value) { - // This oplog entry is a sentinel - if (value.isEmpty()) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _sentinels.push(_lastPushedTimestamp); - _count++; - _cvNoLongerEmpty.notify_all(); - return; - } Batch valueBatch = {value}; pushAllNonBlocking(txn, valueBatch.begin(), valueBatch.end()); } @@ -173,7 +165,6 @@ void OplogBufferCollection::clear(OperationContext* txn) { _createCollection(txn); _size = 0; _count = 0; - std::queue<Timestamp>().swap(_sentinels); _sentinelCount = 0; _lastPushedTimestamp = {}; _lastPoppedKey = {}; @@ -219,15 +210,6 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( } bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) { - // If there is a sentinel, and it was pushed right after the last BSONObj to be popped was - // pushed, then we pop off a sentinel instead and decrease the count by 1. - if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) { - _sentinels.pop(); - _count--; - *value = BSONObj(); - return true; - } - BSONObj docFromCollection; if (!_peekOneSide_inlock( txn, &docFromCollection, true, PeekMode::kReturnUnmodifiedDocumentFromCollection)) { @@ -249,12 +231,6 @@ bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn, PeekMode peekMode) const { invariant(_count > 0); - // If there is a sentinel, and it was pushed right after the last BSONObj to be popped was - // pushed, then we return an empty BSONObj for the sentinel. - if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) { - *value = BSONObj(); - return true; - } auto scanDirection = front ? StorageInterface::ScanDirection::kForward : StorageInterface::ScanDirection::kBackward; BSONObj startKey; @@ -295,11 +271,6 @@ void OplogBufferCollection::_dropCollection(OperationContext* txn) { fassert(40155, _storageInterface->dropCollection(txn, _nss)); } -std::queue<Timestamp> OplogBufferCollection::getSentinels_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _sentinels; -} - std::size_t OplogBufferCollection::getSentinelCount_forTest() const { return _sentinelCount; } @@ -311,10 +282,6 @@ Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const { Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _getLastPoppedTimestamp_inlock(); -} - -Timestamp OplogBufferCollection::_getLastPoppedTimestamp_inlock() const { return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].Obj()["ts"].timestamp(); } diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index 5e08983ac5d..f989fff8dc6 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -115,7 +115,6 @@ public: boost::optional<Value> lastObjectPushed(OperationContext* txn) const override; // ---- Testing API ---- - std::queue<Timestamp> getSentinels_forTest() const; std::size_t getSentinelCount_forTest() const; Timestamp getLastPushedTimestamp_forTest() const; Timestamp getLastPoppedTimestamp_forTest() const; @@ -150,8 +149,6 @@ private: */ bool _pop_inlock(OperationContext* txn, Value* value); - Timestamp _getLastPoppedTimestamp_inlock() const; - // The namespace for the oplog buffer collection. const NamespaceString _nss; @@ -168,8 +165,6 @@ private: // Size of documents in buffer. std::size_t _size; - std::queue<Timestamp> _sentinels; - // Number of sentinel values inserted so far with the same timestamp as '_lastPoppedKey'. std::size_t _sentinelCount = 0; diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 205042d2837..11d6030d472 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -247,6 +247,36 @@ DEATH_TEST_F(OplogBufferCollectionTest, OplogBufferCollection::addIdToDocument(BSON("x" << 1), {}, 0); } +/** + * Check collection contents. OplogInterface returns documents in reverse natural order. + */ +void _assertDocumentsInCollectionEquals(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) { + std::vector<BSONObj> reversedTransformedDocs; + Timestamp ts; + std::size_t sentinelCount = 0; + for (const auto& doc : docs) { + auto previousTimestamp = ts; + BSONObj newDoc; + std::tie(newDoc, ts, sentinelCount) = + OplogBufferCollection::addIdToDocument(doc, ts, sentinelCount); + reversedTransformedDocs.push_back(newDoc); + if (doc.isEmpty()) { + ASSERT_EQUALS(previousTimestamp, ts); + continue; + } + ASSERT_GT(ts, previousTimestamp); + } + std::reverse(reversedTransformedDocs.begin(), reversedTransformedDocs.end()); + OplogInterfaceLocal oplog(txn, nss.ns()); + auto iter = oplog.makeIterator(); + for (const auto& doc : reversedTransformedDocs) { + ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first); + } + ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); +} + TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocument) { auto nss = makeNamespace(_agent); OplogBufferCollection oplogBuffer(_storageInterface, nss); @@ -257,14 +287,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocum oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end()); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[0], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); } TEST_F(OplogBufferCollectionTest, @@ -289,14 +312,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAddsDocument) { oplogBuffer.push(_txn.get(), oplog); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog, - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog}); } TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument) { @@ -309,17 +325,9 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument) oplogBuffer.pushEvenIfFull(_txn.get(), oplog); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 0UL); + ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest()); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog, - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog}); } TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) { @@ -348,17 +356,7 @@ TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) { ASSERT_BSONOBJ_EQ(doc, oplog1); ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog2, - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog1, - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog1, oplog2}); } TEST_F(OplogBufferCollectionTest, PeekWithNoDocumentsReturnsFalse) { @@ -373,11 +371,7 @@ TEST_F(OplogBufferCollectionTest, PeekWithNoDocumentsReturnsFalse) { ASSERT_TRUE(doc.isEmpty()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {}); } TEST_F(OplogBufferCollectionTest, PopDoesNotRemoveDocumentFromCollection) { @@ -395,14 +389,7 @@ TEST_F(OplogBufferCollectionTest, PopDoesNotRemoveDocumentFromCollection) { ASSERT_BSONOBJ_EQ(doc, oplog); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog, - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog}); } TEST_F(OplogBufferCollectionTest, PopWithNoDocumentsReturnsFalse) { @@ -417,11 +404,7 @@ TEST_F(OplogBufferCollectionTest, PopWithNoDocumentsReturnsFalse) { ASSERT_TRUE(doc.isEmpty()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {}); } TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) { @@ -436,21 +419,7 @@ TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) { oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end()); ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - auto checkDocumentsInCollection = [&]() { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[2], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[0], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - }; - checkDocumentsInCollection(); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); BSONObj doc; ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); @@ -478,7 +447,7 @@ TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) { ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); // tryPop does not remove documents from collection. - checkDocumentsInCollection(); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); } TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNewestOplogEntry) { @@ -526,7 +495,6 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { oplogBuffer.startup(_txn.get()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL); ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); @@ -535,53 +503,53 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { oplogBuffer.push(_txn.get(), oplog); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize())); - ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL); ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog}); + BSONObj sentinel; oplogBuffer.push(_txn.get(), sentinel); ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize())); - ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL); - ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize() + BSONObj().objsize())); + ASSERT_EQUALS(1U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog, sentinel}); + BSONObj oplog2 = makeOplogEntry(2); oplogBuffer.push(_txn.get(), oplog2); ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); - ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize() + oplog2.objsize())); - ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL); + ASSERT_EQUALS(oplogBuffer.getSize(), + std::size_t(oplog.objsize() + BSONObj().objsize() + oplog2.objsize())); ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog, sentinel, oplog2}); + BSONObj poppedDoc; ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &poppedDoc)); ASSERT_BSONOBJ_EQ(oplog, poppedDoc); ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize())); - ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(BSONObj().objsize() + oplog2.objsize())); ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog, sentinel, oplog2}); + oplogBuffer.clear(_txn.get()); ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); - ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL); ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + _assertDocumentsInCollectionEquals(_txn.get(), nss, {}); BSONObj doc; ASSERT_FALSE(oplogBuffer.peek(_txn.get(), &doc)); @@ -685,84 +653,92 @@ TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndTimesOutWhenItDoesNotFindD ASSERT_EQUALS(count, 0UL); } -TEST_F(OplogBufferCollectionTest, PushPushesonSentinelsProperly) { - auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - - oplogBuffer.startup(_txn.get()); +void _testPushSentinelsProperly( + OperationContext* txn, + const NamespaceString& nss, + StorageInterface* storageInterface, + stdx::function<void(OperationContext* txn, + OplogBufferCollection* oplogBuffer, + const std::vector<BSONObj>& oplog)> pushDocsFn) { + OplogBufferCollection oplogBuffer(storageInterface, nss); + oplogBuffer.startup(txn); const std::vector<BSONObj> oplog = { BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj(), }; ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.push(_txn.get(), oplog[0]); - oplogBuffer.push(_txn.get(), oplog[1]); - oplogBuffer.push(_txn.get(), oplog[2]); - oplogBuffer.push(_txn.get(), oplog[3]); - oplogBuffer.push(_txn.get(), oplog[4]); - oplogBuffer.push(_txn.get(), oplog[5]); + pushDocsFn(txn, &oplogBuffer, oplog); ASSERT_EQUALS(oplogBuffer.getCount(), 6UL); + _assertDocumentsInCollectionEquals(txn, nss, oplog); +} - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 4UL); - ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2)); - - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[4], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } +TEST_F(OplogBufferCollectionTest, PushPushesOnSentinelsProperly) { + auto nss = makeNamespace(_agent); + _testPushSentinelsProperly(_txn.get(), + nss, + _storageInterface, + [](OperationContext* txn, + OplogBufferCollection* oplogBuffer, + const std::vector<BSONObj>& oplog) { + oplogBuffer->push(txn, oplog[0]); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->push(txn, oplog[1]); + ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->push(txn, oplog[2]); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->push(txn, oplog[3]); + ASSERT_EQUALS(2U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->push(txn, oplog[4]); + ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->push(txn, oplog[5]); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); + }); } TEST_F(OplogBufferCollectionTest, PushEvenIfFullPushesOnSentinelsProperly) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); + _testPushSentinelsProperly(_txn.get(), + nss, + _storageInterface, + [](OperationContext* txn, + OplogBufferCollection* oplogBuffer, + const std::vector<BSONObj>& oplog) { + oplogBuffer->pushEvenIfFull(txn, oplog[0]); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); - oplogBuffer.startup(_txn.get()); - const std::vector<BSONObj> oplog = { - BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj(), - }; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[0]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[2]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[3]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[4]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[5]); - ASSERT_EQUALS(oplogBuffer.getCount(), 6UL); + oplogBuffer->pushEvenIfFull(txn, oplog[1]); + ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest()); - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 4UL); - ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2)); - - { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[4], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - } + oplogBuffer->pushEvenIfFull(txn, oplog[2]); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->pushEvenIfFull(txn, oplog[3]); + ASSERT_EQUALS(2U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->pushEvenIfFull(txn, oplog[4]); + ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest()); + + oplogBuffer->pushEvenIfFull(txn, oplog[5]); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); + }); +} + +TEST_F(OplogBufferCollectionTest, PushAllNonBlockingPushesOnSentinelsProperly) { + auto nss = makeNamespace(_agent); + _testPushSentinelsProperly(_txn.get(), + nss, + _storageInterface, + [](OperationContext* txn, + OplogBufferCollection* oplogBuffer, + const std::vector<BSONObj>& oplog) { + oplogBuffer->pushAllNonBlocking( + txn, oplog.cbegin(), oplog.cend()); + ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest()); + }); } DEATH_TEST_F( @@ -780,39 +756,6 @@ DEATH_TEST_F( oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end()); } -TEST_F(OplogBufferCollectionTest, PushAllNonBlockingInsertsSentinelsIntoCollection) { - auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(_storageInterface, nss); - - oplogBuffer.startup(_txn.get()); - const std::vector<BSONObj> oplog = { - BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj()}; - ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end()); - - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - auto doc = unittest::assertGet(iter->next()).first; - ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp(2, 2) << "s" << 1)), doc); - ASSERT_BSONOBJ_EQ(oplog[5], OplogBufferCollection::extractEmbeddedOplogDocument(doc)); - ASSERT_BSONOBJ_EQ(oplog[4], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - doc = unittest::assertGet(iter->next()).first; - ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 2)), doc); - ASSERT_BSONOBJ_EQ(oplog[3], OplogBufferCollection::extractEmbeddedOplogDocument(doc)); - doc = unittest::assertGet(iter->next()).first; - ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 1)), doc); - ASSERT_BSONOBJ_EQ(oplog[2], OplogBufferCollection::extractEmbeddedOplogDocument(doc)); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - doc = unittest::assertGet(iter->next()).first; - ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp() << "s" << 1)), doc); - ASSERT_BSONOBJ_EQ(oplog[0], OplogBufferCollection::extractEmbeddedOplogDocument(doc)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); -} - TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) { auto nss = makeNamespace(_agent); OplogBufferCollection oplogBuffer(_storageInterface, nss); @@ -828,25 +771,7 @@ TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) { oplogBuffer.pushEvenIfFull(_txn.get(), oplog[3]); ASSERT_EQUALS(oplogBuffer.getCount(), 4UL); - auto checkDocumentsInCollection = [&]() { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[3], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[0], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - }; - checkDocumentsInCollection(); - - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 1UL); - ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2)); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); BSONObj doc; ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); @@ -873,9 +798,6 @@ TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) { ASSERT_TRUE(doc.isEmpty()); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); - sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 0UL); - ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); ASSERT_BSONOBJ_EQ(doc, oplog[3]); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); @@ -885,7 +807,7 @@ TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) { ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); // tryPop does not remove documents from collection. - checkDocumentsInCollection(); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); } TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) { @@ -899,19 +821,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) { oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]); ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - auto checkDocumentsInCollection = [&]() { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - }; - checkDocumentsInCollection(); - - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 1UL); - ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0)); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); BSONObj doc; ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); @@ -931,7 +841,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) { ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); // tryPop does not remove documents from collection. - checkDocumentsInCollection(); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); } TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) { @@ -945,19 +855,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) { oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]); ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); - auto checkDocumentsInCollection = [&]() { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[0], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - }; - checkDocumentsInCollection(); - - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 1UL); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); BSONObj doc; ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); @@ -977,7 +875,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) { ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); // tryPop does not remove documents from collection. - checkDocumentsInCollection(); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); } TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) { @@ -989,36 +887,10 @@ TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) { BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj(), }; ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[0]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[2]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[3]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[4]); - oplogBuffer.pushEvenIfFull(_txn.get(), oplog[5]); + oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.cbegin(), oplog.cend()); ASSERT_EQUALS(oplogBuffer.getCount(), 6UL); - auto checkDocumentsInCollection = [&]() { - OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); - auto iter = collectionReader.makeIterator(); - ASSERT_BSONOBJ_EQ(oplog[4], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_BSONOBJ_EQ(oplog[1], - OplogBufferCollection::extractEmbeddedOplogDocument( - unittest::assertGet(iter->next()).first)); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus()); - }; - checkDocumentsInCollection(); - - auto sentinels = oplogBuffer.getSentinels_forTest(); - ASSERT_EQUALS(sentinels.size(), 4UL); - ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1)); - sentinels.pop(); - ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2)); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); BSONObj doc; ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc)); @@ -1070,7 +942,7 @@ TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) { ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); // tryPop does not remove documents from collection. - checkDocumentsInCollection(); + _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog); } TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndFindsSentinel) { |