summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-09-16 14:03:47 -0400
committerBenety Goh <benety@mongodb.com>2016-09-16 16:14:06 -0400
commit446c059c3086dcb579816d29b8e1b43da13e8dbb (patch)
tree15d1078421c2bc1de442ddba7cb501cdbfc45223 /src/mongo/db
parent4df2c88f48426b3692d8bf507dda02753d0c5785 (diff)
downloadmongo-446c059c3086dcb579816d29b8e1b43da13e8dbb.tar.gz
SERVER-25268 renamed OplogBuffer::blockingPeek to waitForData and removed peek functionality
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/bgsync.cpp6
-rw-r--r--src/mongo/db/repl/bgsync.h2
-rw-r--r--src/mongo/db/repl/oplog_buffer.h4
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp5
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h2
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp6
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h2
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp41
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
9 files changed, 34 insertions, 36 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 77f803fd303..ad7cb8a744c 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -529,11 +529,9 @@ bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) {
return _oplogBuffer->peek(txn, op);
}
-void BackgroundSync::waitForMore(OperationContext* txn) {
- BSONObj op;
+void BackgroundSync::waitForMore() {
// Block for one second before timing out.
- // Ignore the value of the op we peeked at.
- _oplogBuffer->blockingPeek(txn, &op, Seconds(1));
+ _oplogBuffer->waitForData(Seconds(1));
}
void BackgroundSync::consume(OperationContext* txn) {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 79305495652..4feed8a6d49 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -97,7 +97,7 @@ public:
bool peek(OperationContext* txn, BSONObj* op);
void consume(OperationContext* txn);
void clearSyncTarget();
- void waitForMore(OperationContext* txn);
+ void waitForMore();
// For monitoring
BSONObj getCounters();
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index f5ef5ae9d97..9695260c691 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -146,9 +146,9 @@ public:
/**
* Waits "waitDuration" for an operation to be pushed into the oplog buffer.
* Returns false if oplog buffer is still empty after "waitDuration".
- * Otherwise, returns true and sets "value" to last item in oplog buffer.
+ * Otherwise, returns true.
*/
- virtual bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) = 0;
+ virtual bool waitForData(Seconds waitDuration) = 0;
/**
* Returns false if oplog buffer is empty.
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index 67267b2bdc9..9b9cdb82dac 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -95,8 +95,9 @@ bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) {
return _queue.tryPop(*value);
}
-bool OplogBufferBlockingQueue::blockingPeek(OperationContext*, Value* value, Seconds waitDuration) {
- return _queue.blockingPeek(*value, static_cast<int>(durationCount<Seconds>(waitDuration)));
+bool OplogBufferBlockingQueue::waitForData(Seconds waitDuration) {
+ Value ignored;
+ return _queue.blockingPeek(ignored, static_cast<int>(durationCount<Seconds>(waitDuration)));
}
bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) {
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 490b863f1f3..b0fa36a8157 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -55,7 +55,7 @@ public:
std::size_t getCount() const override;
void clear(OperationContext* txn) override;
bool tryPop(OperationContext* txn, Value* value) override;
- bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override;
+ bool waitForData(Seconds waitDuration) override;
bool peek(OperationContext* txn, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* txn) const override;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index fe27a6870f3..65082937bc8 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -176,15 +176,13 @@ bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) {
return _pop_inlock(txn, value);
}
-bool OplogBufferCollection::blockingPeek(OperationContext* txn,
- Value* value,
- Seconds waitDuration) {
+bool OplogBufferCollection::waitForData(Seconds waitDuration) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (!_cvNoLongerEmpty.wait_for(
lk, waitDuration.toSystemDuration(), [&]() { return _count != 0; })) {
return false;
}
- return _peekOneSide_inlock(txn, value, true);
+ return _count != 0;
}
bool OplogBufferCollection::peek(OperationContext* txn, Value* value) {
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index f98fa5f1fd9..e65b20e37c2 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -87,7 +87,7 @@ public:
std::size_t getCount() const override;
void clear(OperationContext* txn) override;
bool tryPop(OperationContext* txn, Value* value) override;
- bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override;
+ bool waitForData(Seconds waitDuration) override;
bool peek(OperationContext* txn, Value* value) override;
boost::optional<Value> lastObjectPushed(OperationContext* txn) const override;
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index af5a6c9e965..f91634276cb 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -481,7 +481,7 @@ TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
ASSERT_TRUE(doc.isEmpty());
}
-TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndFindsDocument) {
+TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndFindsDocument) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
@@ -495,7 +495,7 @@ TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndFindsDocument) {
stdx::thread peekingThread([&]() {
Client::initThread("peekingThread");
barrier.countDownAndWait();
- success = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc, Seconds(30));
+ success = oplogBuffer.waitForData(Seconds(30));
count = oplogBuffer.getCount();
});
@@ -505,36 +505,35 @@ TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndFindsDocument) {
peekingThread.join();
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_TRUE(success);
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
ASSERT_BSONOBJ_EQ(doc, oplog);
ASSERT_EQUALS(count, 1UL);
}
-TEST_F(OplogBufferCollectionTest, TwoBlockingPeeksBlockAndFindSameDocument) {
+TEST_F(OplogBufferCollectionTest, TwoWaitForDataInvocationsBlockAndFindSameDocument) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
unittest::Barrier barrier(3U);
BSONObj oplog = makeOplogEntry(1);
- BSONObj doc1;
bool success1 = false;
std::size_t count1 = 0;
- BSONObj doc2;
bool success2 = false;
std::size_t count2 = 0;
stdx::thread peekingThread1([&]() {
Client::initThread("peekingThread1");
barrier.countDownAndWait();
- success1 = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc1, Seconds(30));
+ success1 = oplogBuffer.waitForData(Seconds(30));
count1 = oplogBuffer.getCount();
});
stdx::thread peekingThread2([&]() {
Client::initThread("peekingThread2");
barrier.countDownAndWait();
- success2 = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc2, Seconds(30));
+ success2 = oplogBuffer.waitForData(Seconds(30));
count2 = oplogBuffer.getCount();
});
@@ -545,14 +544,15 @@ TEST_F(OplogBufferCollectionTest, TwoBlockingPeeksBlockAndFindSameDocument) {
peekingThread2.join();
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_TRUE(success1);
- ASSERT_BSONOBJ_EQ(doc1, oplog);
+ BSONObj doc;
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_BSONOBJ_EQ(doc, oplog);
ASSERT_EQUALS(count1, 1UL);
ASSERT_TRUE(success2);
- ASSERT_BSONOBJ_EQ(doc2, oplog);
ASSERT_EQUALS(count2, 1UL);
}
-TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndTimesOutWhenItDoesNotFindDocument) {
+TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndTimesOutWhenItDoesNotFindDocument) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
@@ -563,7 +563,7 @@ TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndTimesOutWhenItDoesNotFind
stdx::thread peekingThread([&]() {
Client::initThread("peekingThread");
- success = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc, Seconds(1));
+ success = oplogBuffer.waitForData(Seconds(1));
count = oplogBuffer.getCount();
});
@@ -571,6 +571,7 @@ TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndTimesOutWhenItDoesNotFind
peekingThread.join();
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
ASSERT_FALSE(success);
+ ASSERT_FALSE(oplogBuffer.peek(_txn.get(), &doc));
ASSERT_TRUE(doc.isEmpty());
ASSERT_EQUALS(count, 0UL);
}
@@ -927,7 +928,7 @@ TEST_F(OplogBufferCollectionTest, MultipleSentinelsAreReturnedInOrder) {
ASSERT_EQUALS(oplogBuffer.getCount(), 0UL);
}
-TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndFindsSentinel) {
+TEST_F(OplogBufferCollectionTest, WaitForDataBlocksAndFindsSentinel) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
@@ -941,7 +942,7 @@ TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndFindsSentinel) {
stdx::thread peekingThread([&]() {
Client::initThread("peekingThread");
barrier.countDownAndWait();
- success = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc, Seconds(30));
+ success = oplogBuffer.waitForData(Seconds(30));
count = oplogBuffer.getCount();
});
@@ -951,36 +952,35 @@ TEST_F(OplogBufferCollectionTest, BlockingPeekBlocksAndFindsSentinel) {
peekingThread.join();
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_TRUE(success);
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
ASSERT_TRUE(doc.isEmpty());
ASSERT_EQUALS(count, 1UL);
}
-TEST_F(OplogBufferCollectionTest, TwoBlockingPeeksBlockAndFindSameSentinel) {
+TEST_F(OplogBufferCollectionTest, TwoWaitForDataInvocationsBlockAndFindSameSentinel) {
auto nss = makeNamespace(_agent);
OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
unittest::Barrier barrier(3U);
BSONObj oplog;
- BSONObj doc1;
bool success1 = false;
std::size_t count1 = 0;
- BSONObj doc2;
bool success2 = false;
std::size_t count2 = 0;
stdx::thread peekingThread1([&]() {
Client::initThread("peekingThread1");
barrier.countDownAndWait();
- success1 = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc1, Seconds(30));
+ success1 = oplogBuffer.waitForData(Seconds(30));
count1 = oplogBuffer.getCount();
});
stdx::thread peekingThread2([&]() {
Client::initThread("peekingThread2");
barrier.countDownAndWait();
- success2 = oplogBuffer.blockingPeek(makeOperationContext().get(), &doc2, Seconds(30));
+ success2 = oplogBuffer.waitForData(Seconds(30));
count2 = oplogBuffer.getCount();
});
@@ -991,10 +991,11 @@ TEST_F(OplogBufferCollectionTest, TwoBlockingPeeksBlockAndFindSameSentinel) {
peekingThread2.join();
ASSERT_EQUALS(oplogBuffer.getCount(), 1UL);
ASSERT_TRUE(success1);
- ASSERT_TRUE(doc1.isEmpty());
+ BSONObj doc;
+ ASSERT_TRUE(oplogBuffer.peek(_txn.get(), &doc));
+ ASSERT_TRUE(doc.isEmpty());
ASSERT_EQUALS(count1, 1UL);
ASSERT_TRUE(success2);
- ASSERT_TRUE(doc2.isEmpty());
ASSERT_EQUALS(count2, 1UL);
}
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index a9586fdd28c..91d6592ad58 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -833,7 +833,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn,
} else {
// Block up to 1 second. We still return true in this case because we want this
// op to be the first in a new batch with a new start time.
- _networkQueue->waitForMore(txn);
+ _networkQueue->waitForMore();
}
}