diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-19 11:22:43 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-10-21 14:13:06 -0400 |
commit | 73a5487793df760bc2fd1f5d02340716c41d217f (patch) | |
tree | 12eaca19aafbc886daaaa1e70d62be65fff1b6aa | |
parent | 53cd93fb9b31b3fe0bfa1fc12d64d9caf7b7f662 (diff) | |
download | mongo-73a5487793df760bc2fd1f5d02340716c41d217f.tar.gz |
SERVER-26666 store key of last popped document
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_buffer_collection_test.cpp | 40 |
3 files changed, 87 insertions, 13 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index 619c7131169..56a5a7aaf82 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -166,6 +166,8 @@ void OplogBufferCollection::clear(OperationContext* txn) { _size = 0; _count = 0; std::queue<Timestamp>().swap(_sentinels); + _lastPushedTimestamp = {}; + _lastPoppedKey = {}; } bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) { @@ -190,7 +192,7 @@ bool OplogBufferCollection::peek(OperationContext* txn, Value* value) { if (_count == 0) { return false; } - return _peekOneSide_inlock(txn, value, true); + return _peekOneSide_inlock(txn, value, true, PeekMode::kExtractEmbeddedDocument); } boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( @@ -200,7 +202,7 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( return boost::none; } Value value; - bool res = _peekOneSide_inlock(txn, &value, false); + bool res = _peekOneSide_inlock(txn, &value, false, PeekMode::kExtractEmbeddedDocument); if (!res) { return boost::none; } @@ -210,18 +212,21 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) { // If there is a sentinel, and it was pushed right after the last BSONObj to be popped was // pushed, then we pop off a sentinel instead and decrease the count by 1. - if (!_sentinels.empty() && (_lastPoppedTimestamp == _sentinels.front())) { + if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) { _sentinels.pop(); _count--; *value = BSONObj(); return true; } - if (!_peekOneSide_inlock(txn, value, true)) { + BSONObj docFromCollection; + if (!_peekOneSide_inlock( + txn, &docFromCollection, true, PeekMode::kReturnUnmodifiedDocumentFromCollection)) { return false; } - _lastPoppedTimestamp = (*value)["ts"].timestamp(); + _lastPoppedKey = docFromCollection["_id"].wrap(""); + *value = extractEmbeddedOplogDocument(docFromCollection).getOwned(); invariant(_count > 0); invariant(_size >= std::size_t(value->objsize())); _count--; @@ -231,12 +236,13 @@ bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) { bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn, Value* value, - bool front) const { + bool front, + PeekMode peekMode) const { invariant(_count > 0); // If there is a sentinel, and it was pushed right after the last BSONObj to be popped was // pushed, then we return an empty BSONObj for the sentinel. - if (!_sentinels.empty() && (_lastPoppedTimestamp == _sentinels.front())) { + if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) { *value = BSONObj(); return true; } @@ -248,8 +254,8 @@ bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn, // Previously popped documents are not actually removed from the collection. When peeking at the // front of the buffer, we use the last popped timestamp to skip ahead to the first document // that has not been popped. - if (front && !_lastPoppedTimestamp.isNull()) { - startKey = BSON("" << _lastPoppedTimestamp); + if (front && !_lastPoppedKey.isEmpty()) { + startKey = _lastPoppedKey; boundInclusion = BoundInclusion::kIncludeEndKeyOnly; } @@ -258,7 +264,15 @@ bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn, _storageInterface->findDocuments( txn, _nss, kIdIdxName, scanDirection, startKey, boundInclusion, 1U)); invariant(1U == docs.size()); - *value = extractEmbeddedOplogDocument(docs.front()).getOwned(); + switch (peekMode) { + case PeekMode::kExtractEmbeddedDocument: + *value = extractEmbeddedOplogDocument(docs.front()).getOwned(); + break; + case PeekMode::kReturnUnmodifiedDocumentFromCollection: + *value = docs.front(); + invariant(value->isOwned()); + break; + } return true; } @@ -277,5 +291,19 @@ std::queue<Timestamp> OplogBufferCollection::getSentinels_forTest() const { return _sentinels; } +Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _lastPushedTimestamp; +} + +Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _getLastPoppedTimestamp_inlock(); +} + +Timestamp OplogBufferCollection::_getLastPoppedTimestamp_inlock() const { + return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].timestamp(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index e65b20e37c2..01901088224 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -93,6 +93,8 @@ public: // ---- Testing API ---- std::queue<Timestamp> getSentinels_forTest() const; + Timestamp getLastPushedTimestamp_forTest() const; + Timestamp getLastPoppedTimestamp_forTest() const; private: @@ -106,12 +108,16 @@ private: */ void _dropCollection(OperationContext* txn); + enum class PeekMode { kExtractEmbeddedDocument, kReturnUnmodifiedDocumentFromCollection }; /** * Returns the last oplog entry on the given side of the buffer. If front is true it will * return the oldest entry, otherwise it will return the newest one. If the buffer is empty * or peeking fails this returns false. */ - bool _peekOneSide_inlock(OperationContext* txn, Value* value, bool front) const; + bool _peekOneSide_inlock(OperationContext* txn, + Value* value, + bool front, + PeekMode peekMode) const; // Storage interface used to perform storage engine level functions on the collection. StorageInterface* _storageInterface; @@ -121,6 +127,8 @@ private: */ bool _pop_inlock(OperationContext* txn, Value* value); + Timestamp _getLastPoppedTimestamp_inlock() const; + // The namespace for the oplog buffer collection. const NamespaceString _nss; @@ -141,7 +149,7 @@ private: Timestamp _lastPushedTimestamp; - Timestamp _lastPoppedTimestamp; + BSONObj _lastPoppedKey; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 536ffacc9ae..98c653c8b25 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -480,14 +480,52 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); - BSONObj oplog = makeOplogEntry(1); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); + ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); + ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + + BSONObj oplog = makeOplogEntry(1); oplogBuffer.push(_txn.get(), oplog); ASSERT_EQUALS(oplogBuffer.getCount(), 1UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize())); + ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL); + ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + + BSONObj sentinel; + oplogBuffer.push(_txn.get(), sentinel); + ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize())); + ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL); + ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + + BSONObj oplog2 = makeOplogEntry(2); + oplogBuffer.push(_txn.get(), oplog2); + ASSERT_EQUALS(oplogBuffer.getCount(), 3UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize() + oplog2.objsize())); + ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL); + ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); + + BSONObj poppedDoc; + ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &poppedDoc)); + ASSERT_BSONOBJ_EQ(oplog, poppedDoc); + ASSERT_EQUALS(oplogBuffer.getCount(), 2UL); + ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize())); + ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL); + ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); oplogBuffer.clear(_txn.get()); ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); ASSERT_EQUALS(oplogBuffer.getCount(), 0UL); + ASSERT_EQUALS(oplogBuffer.getSize(), 0UL); + ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest()); + ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest()); { OplogInterfaceLocal collectionReader(_txn.get(), nss.ns()); |