summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-10-19 21:55:52 -0400
committerBenety Goh <benety@mongodb.com>2016-10-21 19:27:12 -0400
commit9700b08a0cfaafaca6bac9efa5f188b0acf0f464 (patch)
tree57c01e361e7bd63093e4e8854dea8c914064994e
parent864ac6c37abe024239beba37bf319438631487ae (diff)
downloadmongo-9700b08a0cfaafaca6bac9efa5f188b0acf0f464.tar.gz
SERVER-26666 OplogBufferCollection saves sentinels in collection
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp33
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h5
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp402
3 files changed, 137 insertions, 303 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index 1e3df806934..bf7ca8bdd73 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -99,14 +99,6 @@ void OplogBufferCollection::shutdown(OperationContext* txn) {
}
void OplogBufferCollection::pushEvenIfFull(OperationContext* txn, const Value& value) {
- // This oplog entry is a sentinel
- if (value.isEmpty()) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _sentinels.push(_lastPushedTimestamp);
- _count++;
- _cvNoLongerEmpty.notify_all();
- return;
- }
Batch valueBatch = {value};
pushAllNonBlocking(txn, valueBatch.begin(), valueBatch.end());
}
@@ -173,7 +165,6 @@ void OplogBufferCollection::clear(OperationContext* txn) {
_createCollection(txn);
_size = 0;
_count = 0;
- std::queue<Timestamp>().swap(_sentinels);
_sentinelCount = 0;
_lastPushedTimestamp = {};
_lastPoppedKey = {};
@@ -219,15 +210,6 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
}
bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) {
- // If there is a sentinel, and it was pushed right after the last BSONObj to be popped was
- // pushed, then we pop off a sentinel instead and decrease the count by 1.
- if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) {
- _sentinels.pop();
- _count--;
- *value = BSONObj();
- return true;
- }
-
BSONObj docFromCollection;
if (!_peekOneSide_inlock(
txn, &docFromCollection, true, PeekMode::kReturnUnmodifiedDocumentFromCollection)) {
@@ -249,12 +231,6 @@ bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn,
PeekMode peekMode) const {
invariant(_count > 0);
- // If there is a sentinel, and it was pushed right after the last BSONObj to be popped was
- // pushed, then we return an empty BSONObj for the sentinel.
- if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) {
- *value = BSONObj();
- return true;
- }
auto scanDirection = front ? StorageInterface::ScanDirection::kForward
: StorageInterface::ScanDirection::kBackward;
BSONObj startKey;
@@ -295,11 +271,6 @@ void OplogBufferCollection::_dropCollection(OperationContext* txn) {
fassert(40155, _storageInterface->dropCollection(txn, _nss));
}
-std::queue<Timestamp> OplogBufferCollection::getSentinels_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _sentinels;
-}
-
std::size_t OplogBufferCollection::getSentinelCount_forTest() const {
return _sentinelCount;
}
@@ -311,10 +282,6 @@ Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const {
Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _getLastPoppedTimestamp_inlock();
-}
-
-Timestamp OplogBufferCollection::_getLastPoppedTimestamp_inlock() const {
return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].Obj()["ts"].timestamp();
}
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index 5e08983ac5d..f989fff8dc6 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -115,7 +115,6 @@ public:
boost::optional<Value> lastObjectPushed(OperationContext* txn) const override;
// ---- Testing API ----
- std::queue<Timestamp> getSentinels_forTest() const;
std::size_t getSentinelCount_forTest() const;
Timestamp getLastPushedTimestamp_forTest() const;
Timestamp getLastPoppedTimestamp_forTest() const;
@@ -150,8 +149,6 @@ private:
*/
bool _pop_inlock(OperationContext* txn, Value* value);
- Timestamp _getLastPoppedTimestamp_inlock() const;
-
// The namespace for the oplog buffer collection.
const NamespaceString _nss;
@@ -168,8 +165,6 @@ private:
// Size of documents in buffer.
std::size_t _size;
- std::queue<Timestamp> _sentinels;
-
// Number of sentinel values inserted so far with the same timestamp as '_lastPoppedKey'.
std::size_t _sentinelCount = 0;
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 205042d2837..11d6030d472 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -247,6 +247,36 @@ DEATH_TEST_F(OplogBufferCollectionTest,
OplogBufferCollection::addIdToDocument(BSON("x" << 1), {}, 0);
}
+/**
+ * Check collection contents. OplogInterface returns documents in reverse natural order.
+ */
+void _assertDocumentsInCollectionEquals(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) {
+ std::vector<BSONObj> reversedTransformedDocs;
+ Timestamp ts;
+ std::size_t sentinelCount = 0;
+ for (const auto& doc : docs) {
+ auto previousTimestamp = ts;
+ BSONObj newDoc;
+ std::tie(newDoc, ts, sentinelCount) =
+ OplogBufferCollection::addIdToDocument(doc, ts, sentinelCount);
+ reversedTransformedDocs.push_back(newDoc);
+ if (doc.isEmpty()) {
+ ASSERT_EQUALS(previousTimestamp, ts);
+ continue;
+ }
+ ASSERT_GT(ts, previousTimestamp);
+ }
+ std::reverse(reversedTransformedDocs.begin(), reversedTransformedDocs.end());
+ OplogInterfaceLocal oplog(txn, nss.ns());
+ auto iter = oplog.makeIterator();
+ for (const auto& doc : reversedTransformedDocs) {
+ ASSERT_BSONOBJ_EQ(doc, unittest::assertGet(iter->next()).first);
+ }
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
+}
+
TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocument) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
@@ -257,14 +287,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocum
oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end());
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[0],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
}
TEST_F(OplogBufferCollectionTest,
@@ -289,14 +312,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAddsDocument) {
oplogBuffer.push(_txn.get(), oplog);
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog,
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog});
}
TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument) {
@@ -309,17 +325,9 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument)
oplogBuffer.pushEvenIfFull(_txn.get(), oplog);
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 0UL);
+ ASSERT_EQUALS(0UL, oplogBuffer.getSentinelCount_forTest());
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog,
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog});
}
TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) {
@@ -348,17 +356,7 @@ TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) {
ASSERT_BSONOBJ_EQ(doc, oplog1);
ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog2,
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog1,
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog1, oplog2});
}
TEST_F(OplogBufferCollectionTest, PeekWithNoDocumentsReturnsFalse) {
@@ -373,11 +371,7 @@ TEST_F(OplogBufferCollectionTest, PeekWithNoDocumentsReturnsFalse) {
ASSERT_TRUE(doc.isEmpty());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {});
}
TEST_F(OplogBufferCollectionTest, PopDoesNotRemoveDocumentFromCollection) {
@@ -395,14 +389,7 @@ TEST_F(OplogBufferCollectionTest, PopDoesNotRemoveDocumentFromCollection) {
ASSERT_BSONOBJ_EQ(doc, oplog);
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog,
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog});
}
TEST_F(OplogBufferCollectionTest, PopWithNoDocumentsReturnsFalse) {
@@ -417,11 +404,7 @@ TEST_F(OplogBufferCollectionTest, PopWithNoDocumentsReturnsFalse) {
ASSERT_TRUE(doc.isEmpty());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {});
}
TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) {
@@ -436,21 +419,7 @@ TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) {
oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end());
ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- auto checkDocumentsInCollection = [&]() {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[2],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[0],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- };
- checkDocumentsInCollection();
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
BSONObj doc;
ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
@@ -478,7 +447,7 @@ TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) {
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
// tryPop does not remove documents from collection.
- checkDocumentsInCollection();
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
}
TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNewestOplogEntry) {
@@ -526,7 +495,6 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
oplogBuffer.startup(_txn.get());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL);
ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
@@ -535,53 +503,53 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
oplogBuffer.push(_txn.get(), oplog);
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize()));
- ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL);
ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog});
+
BSONObj sentinel;
oplogBuffer.push(_txn.get(), sentinel);
ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize()));
- ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL);
- ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize() + BSONObj().objsize()));
+ ASSERT_EQUALS(1U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog, sentinel});
+
BSONObj oplog2 = makeOplogEntry(2);
oplogBuffer.push(_txn.get(), oplog2);
ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
- ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize() + oplog2.objsize()));
- ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(),
+ std::size_t(oplog.objsize() + BSONObj().objsize() + oplog2.objsize()));
ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog, sentinel, oplog2});
+
BSONObj poppedDoc;
ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &poppedDoc));
ASSERT_BSONOBJ_EQ(oplog, poppedDoc);
ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize()));
- ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(BSONObj().objsize() + oplog2.objsize()));
ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {oplog, sentinel, oplog2});
+
oplogBuffer.clear(_txn.get());
ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
- ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL);
ASSERT_EQUALS(0U, oplogBuffer.getSentinelCount_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, {});
BSONObj doc;
ASSERT_FALSE(oplogBuffer.peek(_txn.get(), &doc));
@@ -685,84 +653,92 @@ TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndTimesOutWhenItDoesNotFindD
ASSERT_EQUALS(count, 0UL);
}
-TEST_F(OplogBufferCollectionTest, PushPushesonSentinelsProperly) {
- auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
-
- oplogBuffer.startup(_txn.get());
+void _testPushSentinelsProperly(
+ OperationContext* txn,
+ const NamespaceString& nss,
+ StorageInterface* storageInterface,
+ stdx::function<void(OperationContext* txn,
+ OplogBufferCollection* oplogBuffer,
+ const std::vector<BSONObj>& oplog)> pushDocsFn) {
+ OplogBufferCollection oplogBuffer(storageInterface, nss);
+ oplogBuffer.startup(txn);
const std::vector<BSONObj> oplog = {
BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj(),
};
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.push(_txn.get(), oplog[0]);
- oplogBuffer.push(_txn.get(), oplog[1]);
- oplogBuffer.push(_txn.get(), oplog[2]);
- oplogBuffer.push(_txn.get(), oplog[3]);
- oplogBuffer.push(_txn.get(), oplog[4]);
- oplogBuffer.push(_txn.get(), oplog[5]);
+ pushDocsFn(txn, &oplogBuffer, oplog);
ASSERT_EQUALS(oplogBuffer.getCount(), 6UL);
+ _assertDocumentsInCollectionEquals(txn, nss, oplog);
+}
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 4UL);
- ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2));
-
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[4],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+TEST_F(OplogBufferCollectionTest, PushPushesOnSentinelsProperly) {
+ auto nss = makeNamespace(_agent);
+ _testPushSentinelsProperly(_txn.get(),
+ nss,
+ _storageInterface,
+ [](OperationContext* txn,
+ OplogBufferCollection* oplogBuffer,
+ const std::vector<BSONObj>& oplog) {
+ oplogBuffer->push(txn, oplog[0]);
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->push(txn, oplog[1]);
+ ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->push(txn, oplog[2]);
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->push(txn, oplog[3]);
+ ASSERT_EQUALS(2U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->push(txn, oplog[4]);
+ ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->push(txn, oplog[5]);
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
+ });
}
TEST_F(OplogBufferCollectionTest, PushEvenIfFullPushesOnSentinelsProperly) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
+ _testPushSentinelsProperly(_txn.get(),
+ nss,
+ _storageInterface,
+ [](OperationContext* txn,
+ OplogBufferCollection* oplogBuffer,
+ const std::vector<BSONObj>& oplog) {
+ oplogBuffer->pushEvenIfFull(txn, oplog[0]);
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
- oplogBuffer.startup(_txn.get());
- const std::vector<BSONObj> oplog = {
- BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj(),
- };
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[0]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[2]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[3]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[4]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[5]);
- ASSERT_EQUALS(oplogBuffer.getCount(), 6UL);
+ oplogBuffer->pushEvenIfFull(txn, oplog[1]);
+ ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest());
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 4UL);
- ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2));
-
- {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[4],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- }
+ oplogBuffer->pushEvenIfFull(txn, oplog[2]);
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->pushEvenIfFull(txn, oplog[3]);
+ ASSERT_EQUALS(2U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->pushEvenIfFull(txn, oplog[4]);
+ ASSERT_EQUALS(0U, oplogBuffer->getSentinelCount_forTest());
+
+ oplogBuffer->pushEvenIfFull(txn, oplog[5]);
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
+ });
+}
+
+TEST_F(OplogBufferCollectionTest, PushAllNonBlockingPushesOnSentinelsProperly) {
+ auto nss = makeNamespace(_agent);
+ _testPushSentinelsProperly(_txn.get(),
+ nss,
+ _storageInterface,
+ [](OperationContext* txn,
+ OplogBufferCollection* oplogBuffer,
+ const std::vector<BSONObj>& oplog) {
+ oplogBuffer->pushAllNonBlocking(
+ txn, oplog.cbegin(), oplog.cend());
+ ASSERT_EQUALS(1U, oplogBuffer->getSentinelCount_forTest());
+ });
}
DEATH_TEST_F(
@@ -780,39 +756,6 @@ DEATH_TEST_F(
oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end());
}
-TEST_F(OplogBufferCollectionTest, PushAllNonBlockingInsertsSentinelsIntoCollection) {
- auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(_storageInterface, nss);
-
- oplogBuffer.startup(_txn.get());
- const std::vector<BSONObj> oplog = {
- BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj()};
- ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.begin(), oplog.end());
-
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- auto doc = unittest::assertGet(iter->next()).first;
- ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp(2, 2) << "s" << 1)), doc);
- ASSERT_BSONOBJ_EQ(oplog[5], OplogBufferCollection::extractEmbeddedOplogDocument(doc));
- ASSERT_BSONOBJ_EQ(oplog[4],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- doc = unittest::assertGet(iter->next()).first;
- ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 2)), doc);
- ASSERT_BSONOBJ_EQ(oplog[3], OplogBufferCollection::extractEmbeddedOplogDocument(doc));
- doc = unittest::assertGet(iter->next()).first;
- ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp(1, 1) << "s" << 1)), doc);
- ASSERT_BSONOBJ_EQ(oplog[2], OplogBufferCollection::extractEmbeddedOplogDocument(doc));
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- doc = unittest::assertGet(iter->next()).first;
- ASSERT_BSONOBJ_EQ(BSON("_id" << BSON("ts" << Timestamp() << "s" << 1)), doc);
- ASSERT_BSONOBJ_EQ(oplog[0], OplogBufferCollection::extractEmbeddedOplogDocument(doc));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
-}
-
TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
@@ -828,25 +771,7 @@ TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) {
oplogBuffer.pushEvenIfFull(_txn.get(), oplog[3]);
ASSERT_EQUALS(oplogBuffer.getCount(), 4UL);
- auto checkDocumentsInCollection = [&]() {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[3],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[0],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- };
- checkDocumentsInCollection();
-
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 1UL);
- ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2));
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
BSONObj doc;
ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
@@ -873,9 +798,6 @@ TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) {
ASSERT_TRUE(doc.isEmpty());
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
- sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 0UL);
-
ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
ASSERT_BSONOBJ_EQ(doc, oplog[3]);
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
@@ -885,7 +807,7 @@ TEST_F(OplogBufferCollectionTest, SentinelInMiddleIsReturnedInOrder) {
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
// tryPop does not remove documents from collection.
- checkDocumentsInCollection();
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
}
TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) {
@@ -899,19 +821,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) {
oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]);
ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- auto checkDocumentsInCollection = [&]() {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- };
- checkDocumentsInCollection();
-
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 1UL);
- ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0));
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
BSONObj doc;
ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
@@ -931,7 +841,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtBeginningIsReturnedAtBeginning) {
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
// tryPop does not remove documents from collection.
- checkDocumentsInCollection();
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
}
TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) {
@@ -945,19 +855,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) {
oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]);
ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
- auto checkDocumentsInCollection = [&]() {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[0],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- };
- checkDocumentsInCollection();
-
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 1UL);
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
BSONObj doc;
ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
@@ -977,7 +875,7 @@ TEST_F(OplogBufferCollectionTest, SentinelAtEndIsReturnedAtEnd) {
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
// tryPop does not remove documents from collection.
- checkDocumentsInCollection();
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
}
TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) {
@@ -989,36 +887,10 @@ TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) {
BSONObj(), makeOplogEntry(1), BSONObj(), BSONObj(), makeOplogEntry(2), BSONObj(),
};
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[0]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[1]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[2]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[3]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[4]);
- oplogBuffer.pushEvenIfFull(_txn.get(), oplog[5]);
+ oplogBuffer.pushAllNonBlocking(_txn.get(), oplog.cbegin(), oplog.cend());
ASSERT_EQUALS(oplogBuffer.getCount(), 6UL);
- auto checkDocumentsInCollection = [&]() {
- OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());
- auto iter = collectionReader.makeIterator();
- ASSERT_BSONOBJ_EQ(oplog[4],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_BSONOBJ_EQ(oplog[1],
- OplogBufferCollection::extractEmbeddedOplogDocument(
- unittest::assertGet(iter->next()).first));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, iter->next().getStatus());
- };
- checkDocumentsInCollection();
-
- auto sentinels = oplogBuffer.getSentinels_forTest();
- ASSERT_EQUALS(sentinels.size(), 4UL);
- ASSERT_EQUALS(sentinels.front(), Timestamp(0, 0));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(1, 1));
- sentinels.pop();
- ASSERT_EQUALS(sentinels.front(), Timestamp(2, 2));
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
BSONObj doc;
ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
@@ -1070,7 +942,7 @@ TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) {
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
// tryPop does not remove documents from collection.
- checkDocumentsInCollection();
+ _assertDocumentsInCollectionEquals(_txn.get(), nss, oplog);
}
TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndFindsSentinel) {