summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-10-19 11:22:43 -0400
committerBenety Goh <benety@mongodb.com>2016-10-21 14:13:06 -0400
commit73a5487793df760bc2fd1f5d02340716c41d217f (patch)
tree12eaca19aafbc886daaaa1e70d62be65fff1b6aa
parent53cd93fb9b31b3fe0bfa1fc12d64d9caf7b7f662 (diff)
downloadmongo-73a5487793df760bc2fd1f5d02340716c41d217f.tar.gz
SERVER-26666 store key of last popped document
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp48
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h12
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp40
3 files changed, 87 insertions, 13 deletions
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index 619c7131169..56a5a7aaf82 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -166,6 +166,8 @@ void OplogBufferCollection::clear(OperationContext* txn) {
_size = 0;
_count = 0;
std::queue<Timestamp>().swap(_sentinels);
+ _lastPushedTimestamp = {};
+ _lastPoppedKey = {};
}
bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) {
@@ -190,7 +192,7 @@ bool OplogBufferCollection::peek(OperationContext* txn, Value* value) {
if (_count == 0) {
return false;
}
- return _peekOneSide_inlock(txn, value, true);
+ return _peekOneSide_inlock(txn, value, true, PeekMode::kExtractEmbeddedDocument);
}
boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
@@ -200,7 +202,7 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
return boost::none;
}
Value value;
- bool res = _peekOneSide_inlock(txn, &value, false);
+ bool res = _peekOneSide_inlock(txn, &value, false, PeekMode::kExtractEmbeddedDocument);
if (!res) {
return boost::none;
}
@@ -210,18 +212,21 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) {
// If there is a sentinel, and it was pushed right after the last BSONObj to be popped was
// pushed, then we pop off a sentinel instead and decrease the count by 1.
- if (!_sentinels.empty() && (_lastPoppedTimestamp == _sentinels.front())) {
+ if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) {
_sentinels.pop();
_count--;
*value = BSONObj();
return true;
}
- if (!_peekOneSide_inlock(txn, value, true)) {
+ BSONObj docFromCollection;
+ if (!_peekOneSide_inlock(
+ txn, &docFromCollection, true, PeekMode::kReturnUnmodifiedDocumentFromCollection)) {
return false;
}
- _lastPoppedTimestamp = (*value)["ts"].timestamp();
+ _lastPoppedKey = docFromCollection["_id"].wrap("");
+ *value = extractEmbeddedOplogDocument(docFromCollection).getOwned();
invariant(_count > 0);
invariant(_size >= std::size_t(value->objsize()));
_count--;
@@ -231,12 +236,13 @@ bool OplogBufferCollection::_pop_inlock(OperationContext* txn, Value* value) {
bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn,
Value* value,
- bool front) const {
+ bool front,
+ PeekMode peekMode) const {
invariant(_count > 0);
// If there is a sentinel, and it was pushed right after the last BSONObj to be popped was
// pushed, then we return an empty BSONObj for the sentinel.
- if (!_sentinels.empty() && (_lastPoppedTimestamp == _sentinels.front())) {
+ if (!_sentinels.empty() && (_getLastPoppedTimestamp_inlock() == _sentinels.front())) {
*value = BSONObj();
return true;
}
@@ -248,8 +254,8 @@ bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn,
// Previously popped documents are not actually removed from the collection. When peeking at the
// front of the buffer, we use the last popped timestamp to skip ahead to the first document
// that has not been popped.
- if (front && !_lastPoppedTimestamp.isNull()) {
- startKey = BSON("" << _lastPoppedTimestamp);
+ if (front && !_lastPoppedKey.isEmpty()) {
+ startKey = _lastPoppedKey;
boundInclusion = BoundInclusion::kIncludeEndKeyOnly;
}
@@ -258,7 +264,15 @@ bool OplogBufferCollection::_peekOneSide_inlock(OperationContext* txn,
_storageInterface->findDocuments(
txn, _nss, kIdIdxName, scanDirection, startKey, boundInclusion, 1U));
invariant(1U == docs.size());
- *value = extractEmbeddedOplogDocument(docs.front()).getOwned();
+ switch (peekMode) {
+ case PeekMode::kExtractEmbeddedDocument:
+ *value = extractEmbeddedOplogDocument(docs.front()).getOwned();
+ break;
+ case PeekMode::kReturnUnmodifiedDocumentFromCollection:
+ *value = docs.front();
+ invariant(value->isOwned());
+ break;
+ }
return true;
}
@@ -277,5 +291,19 @@ std::queue<Timestamp> OplogBufferCollection::getSentinels_forTest() const {
return _sentinels;
}
+Timestamp OplogBufferCollection::getLastPushedTimestamp_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _lastPushedTimestamp;
+}
+
+Timestamp OplogBufferCollection::getLastPoppedTimestamp_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _getLastPoppedTimestamp_inlock();
+}
+
+Timestamp OplogBufferCollection::_getLastPoppedTimestamp_inlock() const {
+ return _lastPoppedKey.isEmpty() ? Timestamp() : _lastPoppedKey[""].timestamp();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index e65b20e37c2..01901088224 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -93,6 +93,8 @@ public:
// ---- Testing API ----
std::queue<Timestamp> getSentinels_forTest() const;
+ Timestamp getLastPushedTimestamp_forTest() const;
+ Timestamp getLastPoppedTimestamp_forTest() const;
private:
@@ -106,12 +108,16 @@ private:
*/
void _dropCollection(OperationContext* txn);
+ enum class PeekMode { kExtractEmbeddedDocument, kReturnUnmodifiedDocumentFromCollection };
/**
* Returns the last oplog entry on the given side of the buffer. If front is true it will
* return the oldest entry, otherwise it will return the newest one. If the buffer is empty
* or peeking fails this returns false.
*/
- bool _peekOneSide_inlock(OperationContext* txn, Value* value, bool front) const;
+ bool _peekOneSide_inlock(OperationContext* txn,
+ Value* value,
+ bool front,
+ PeekMode peekMode) const;
// Storage interface used to perform storage engine level functions on the collection.
StorageInterface* _storageInterface;
@@ -121,6 +127,8 @@ private:
*/
bool _pop_inlock(OperationContext* txn, Value* value);
+ Timestamp _getLastPoppedTimestamp_inlock() const;
+
// The namespace for the oplog buffer collection.
const NamespaceString _nss;
@@ -141,7 +149,7 @@ private:
Timestamp _lastPushedTimestamp;
- Timestamp _lastPoppedTimestamp;
+ BSONObj _lastPoppedKey;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 536ffacc9ae..98c653c8b25 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -480,14 +480,52 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
- BSONObj oplog = makeOplogEntry(1);
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
+ ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL);
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+
+ BSONObj oplog = makeOplogEntry(1);
oplogBuffer.push(_txn.get(), oplog);
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize()));
+ ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL);
+ ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+
+ BSONObj sentinel;
+ oplogBuffer.push(_txn.get(), sentinel);
+ ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize()));
+ ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL);
+ ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+
+ BSONObj oplog2 = makeOplogEntry(2);
+ oplogBuffer.push(_txn.get(), oplog2);
+ ASSERT_EQUALS(oplogBuffer.getCount(), 3UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize() + oplog2.objsize()));
+ ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL);
+ ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
+
+ BSONObj poppedDoc;
+ ASSERT_TRUE(oplogBuffer.tryPop(_txn.get(), &poppedDoc));
+ ASSERT_BSONOBJ_EQ(oplog, poppedDoc);
+ ASSERT_EQUALS(oplogBuffer.getCount(), 2UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), std::size_t(oplog.objsize()));
+ ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 1UL);
+ ASSERT_EQUALS(oplog2["ts"].timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(oplog["ts"].timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
oplogBuffer.clear(_txn.get());
ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
+ ASSERT_EQUALS(oplogBuffer.getSize(), 0UL);
+ ASSERT_EQUALS(oplogBuffer.getSentinels_forTest().size(), 0UL);
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPushedTimestamp_forTest());
+ ASSERT_EQUALS(Timestamp(), oplogBuffer.getLastPoppedTimestamp_forTest());
{
OplogInterfaceLocal collectionReader(_txn.get(), nss.ns());