/** * Copyright 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include #include "mongo/base/disallow_copying.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/oplog_buffer_proxy.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace { using namespace mongo; using namespace mongo::repl; class OplogBufferMock : public OplogBuffer { MONGO_DISALLOW_COPYING(OplogBufferMock); public: OplogBufferMock() = default; virtual ~OplogBufferMock() = default; void startup(OperationContext*) override { startupCalled = true; } void shutdown(OperationContext*) override { shutdownCalled = true; } void pushEvenIfFull(OperationContext* opCtx, const Value& value) override { push(opCtx, value); } void push(OperationContext*, const Value& value) override { values.push_back(value); } void pushAllNonBlocking(OperationContext* opCtx, Batch::const_iterator begin, Batch::const_iterator end) override { for (auto i = begin; i != end; ++i) { push(opCtx, *i); } } void waitForSpace(OperationContext*, std::size_t) override { waitForSpaceCalled = true; } bool isEmpty() const override { return values.empty(); } std::size_t getMaxSize() const override { return maxSize; } std::size_t getSize() const override { std::size_t totalSize = 0; for (auto&& obj : values) { totalSize += obj.objsize(); } return totalSize; } std::size_t getCount() const override { return values.size(); } void clear(OperationContext*) override { values.clear(); } bool tryPop(OperationContext* opCtx, Value* value) override { tryPopCalled = true; if (!peek(opCtx, value)) { return false; } values.pop_front(); return true; } bool waitForData(Seconds) override { // Blocking not supported. waitForDataCalled = true; return !values.empty(); } bool peek(OperationContext*, Value* value) override { peekCalled = true; if (values.empty()) { return false; } *value = values.front(); return true; } /** * Returns boost::none because this function should never be called by the proxy. */ boost::optional lastObjectPushed(OperationContext*) const override { lastObjectPushedCalled = true; return boost::none; } bool startupCalled = false; bool shutdownCalled = false; bool waitForSpaceCalled = false; bool waitForDataCalled = false; bool tryPopCalled = false; bool peekCalled = false; mutable bool lastObjectPushedCalled = false; std::deque values; std::size_t maxSize = 0U; }; class OplogBufferProxyTest : public unittest::Test { private: void setUp() override; void tearDown() override; protected: OplogBufferMock* _mock = nullptr; std::unique_ptr _proxy; OperationContext* _opCtx = nullptr; // Not dereferenced. }; void OplogBufferProxyTest::setUp() { auto mock = stdx::make_unique(); _mock = mock.get(); _proxy = stdx::make_unique(std::move(mock)); } void OplogBufferProxyTest::tearDown() { _proxy.reset(); _mock = nullptr; } DEATH_TEST_F(OplogBufferProxyTest, NullTargetOplogBufferAtConstructionTriggersInvariant, "Invariant failure _target") { OplogBufferProxy(nullptr); } TEST_F(OplogBufferProxyTest, GetTarget) { ASSERT_EQUALS(_mock, _proxy->getTarget()); } TEST_F(OplogBufferProxyTest, Startup) { _proxy->startup(_opCtx); ASSERT_TRUE(_mock->startupCalled); } TEST_F(OplogBufferProxyTest, ShutdownResetsCachedValues) { auto pushValue = BSON("x" << 1); _proxy->push(_opCtx, pushValue); OplogBuffer::Value peekValue; ASSERT_TRUE(_proxy->peek(_opCtx, &peekValue)); ASSERT_BSONOBJ_EQ(pushValue, peekValue); ASSERT_NOT_EQUALS(boost::none, _proxy->lastObjectPushed(_opCtx)); ASSERT_NOT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); _proxy->shutdown(_opCtx); ASSERT_TRUE(_mock->shutdownCalled); ASSERT_EQUALS(boost::none, _proxy->lastObjectPushed(_opCtx)); ASSERT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); } TEST_F(OplogBufferProxyTest, WaitForSpace) { _proxy->waitForSpace(_opCtx, 100U); ASSERT_TRUE(_mock->waitForSpaceCalled); } TEST_F(OplogBufferProxyTest, MaxSize) { _mock->maxSize = 8888U; ASSERT_EQUALS(_mock->maxSize, _proxy->getMaxSize()); } TEST_F(OplogBufferProxyTest, EmptySizeAndCount) { ASSERT_TRUE(_proxy->isEmpty()); OplogBuffer::Batch values = {BSON("x" << 1), BSON("x" << 2)}; _proxy->pushAllNonBlocking(_opCtx, values.cbegin(), values.cend()); ASSERT_FALSE(_proxy->isEmpty()); ASSERT_EQUALS(values.size(), _mock->getCount()); ASSERT_EQUALS(_mock->getCount(), _proxy->getCount()); ASSERT_EQUALS(std::size_t(values[0].objsize() + values[1].objsize()), _mock->getSize()); ASSERT_EQUALS(_mock->getSize(), _proxy->getSize()); } TEST_F(OplogBufferProxyTest, ClearResetsCachedValues) { OplogBuffer::Batch values = {BSON("x" << 1), BSON("x" << 2)}; _proxy->pushAllNonBlocking(_opCtx, values.cbegin(), values.cend()); ASSERT_FALSE(_mock->isEmpty()); auto lastObjPushed = _proxy->lastObjectPushed(_opCtx); ASSERT_NOT_EQUALS(boost::none, lastObjPushed); ASSERT_BSONOBJ_EQ(values.back(), *lastObjPushed); ASSERT_FALSE(_mock->lastObjectPushedCalled); OplogBuffer::Value peekValue; ASSERT_TRUE(_proxy->peek(_opCtx, &peekValue)); ASSERT_NOT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); _proxy->clear(_opCtx); ASSERT_TRUE(_mock->isEmpty()); ASSERT_EQUALS(boost::none, _proxy->lastObjectPushed(_opCtx)); ASSERT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); } void _testPushFunctionUpdatesCachedLastObjectPushed( OperationContext* opCtx, OplogBuffer* proxy, OplogBufferMock* mock, stdx::function pushFn) { ASSERT_EQUALS(proxy->lastObjectPushed(opCtx), boost::none); ASSERT_FALSE(mock->lastObjectPushedCalled); auto val = BSON("x" << 1); auto numPushed = pushFn(opCtx, proxy, val); ASSERT_EQUALS(numPushed, mock->values.size()); ASSERT_BSONOBJ_EQ(val, mock->values.back()); auto lastObjPushed = proxy->lastObjectPushed(opCtx); ASSERT_NOT_EQUALS(boost::none, lastObjPushed); ASSERT_BSONOBJ_EQ(val, *lastObjPushed); ASSERT_FALSE(mock->lastObjectPushedCalled); } TEST_F(OplogBufferProxyTest, PushEvenIfFullUpdatesCachedLastObjectPushed) { auto pushFn = [](OperationContext* opCtx, OplogBuffer* proxy, const OplogBuffer::Value& value) { proxy->pushEvenIfFull(opCtx, value); return 1U; }; _testPushFunctionUpdatesCachedLastObjectPushed(_opCtx, _proxy.get(), _mock, pushFn); } TEST_F(OplogBufferProxyTest, PushUpdatesCachedLastObjectPushed) { auto pushFn = [](OperationContext* opCtx, OplogBuffer* proxy, const OplogBuffer::Value& value) { proxy->push(opCtx, value); return 1U; }; _testPushFunctionUpdatesCachedLastObjectPushed(_opCtx, _proxy.get(), _mock, pushFn); } TEST_F(OplogBufferProxyTest, PushAllNonBlockingUpdatesCachedLastObjectPushed) { auto pushFn = [](OperationContext* opCtx, OplogBuffer* proxy, const OplogBuffer::Value& value) { OplogBuffer::Batch values = {BSON("x" << 2), value}; proxy->pushAllNonBlocking(opCtx, values.cbegin(), values.cend()); return values.size(); }; _testPushFunctionUpdatesCachedLastObjectPushed(_opCtx, _proxy.get(), _mock, pushFn); } TEST_F(OplogBufferProxyTest, PushAllNonBlockingDoesNotUpdateCachedLastObjectPushedOnEmptyBatch) { OplogBuffer::Batch values; _proxy->pushAllNonBlocking(_opCtx, values.cbegin(), values.cend()); ASSERT_EQUALS(values.size(), _mock->values.size()); ASSERT_EQUALS(boost::none, _proxy->lastObjectPushed(_opCtx)); ASSERT_FALSE(_mock->lastObjectPushedCalled); } TEST_F(OplogBufferProxyTest, WaitForDataReturnsTrueImmediatelyIfLastObjectPushedIsCached) { _proxy->pushEvenIfFull(_opCtx, BSON("x" << 1)); ASSERT_TRUE(_proxy->waitForData(Seconds(10))); ASSERT_FALSE(_mock->waitForDataCalled); } TEST_F(OplogBufferProxyTest, WaitForDataForwardsCallToTargetIfLastObjectPushedIsNotCached) { ASSERT_FALSE(_proxy->waitForData(Seconds(10))); ASSERT_TRUE(_mock->waitForDataCalled); } TEST_F(OplogBufferProxyTest, TryPopResetsLastPushedObjectIfBufferIsEmpty) { auto pushValue = BSON("x" << 1); _proxy->push(_opCtx, BSON("x" << 1)); auto lastPushed = _proxy->lastObjectPushed(_opCtx); ASSERT_NOT_EQUALS(boost::none, _proxy->lastObjectPushed(_opCtx)); ASSERT_BSONOBJ_EQ(pushValue, *lastPushed); OplogBuffer::Value poppedValue; ASSERT_TRUE(_proxy->tryPop(_opCtx, &poppedValue)); ASSERT_BSONOBJ_EQ(pushValue, poppedValue); ASSERT_EQUALS(boost::none, _proxy->lastObjectPushed(_opCtx)); // waitForData should forward call to underlying buffer. ASSERT_FALSE(_proxy->waitForData(Seconds(10))); ASSERT_TRUE(_mock->waitForDataCalled); } TEST_F(OplogBufferProxyTest, PeekCachesFrontOfBuffer) { OplogBuffer::Value peekValue; ASSERT_FALSE(_mock->peekCalled); ASSERT_FALSE(_proxy->peek(_opCtx, &peekValue)); ASSERT_TRUE(_mock->peekCalled); ASSERT_TRUE(peekValue.isEmpty()); _mock->peekCalled = false; OplogBuffer::Batch values = {BSON("x" << 1), BSON("x" << 2)}; _proxy->pushAllNonBlocking(_opCtx, values.cbegin(), values.cend()); ASSERT_EQUALS(values.size(), _mock->values.size()); ASSERT_TRUE(_proxy->peek(_opCtx, &peekValue)); ASSERT_TRUE(_mock->peekCalled); ASSERT_BSONOBJ_EQ(values.front(), peekValue); _mock->peekCalled = false; peekValue = OplogBuffer::Value(); ASSERT_TRUE(_proxy->peek(_opCtx, &peekValue)); ASSERT_FALSE(_mock->peekCalled); ASSERT_BSONOBJ_EQ(values.front(), peekValue); } TEST_F(OplogBufferProxyTest, TryPopClearsCachedFrontValue) { OplogBuffer::Batch values = {BSON("x" << 1), BSON("x" << 2)}; _proxy->pushAllNonBlocking(_opCtx, values.cbegin(), values.cend()); ASSERT_EQUALS(values.size(), _mock->values.size()); // Peek and pop first value {x: 1}. OplogBuffer::Value peekValue; ASSERT_TRUE(_proxy->peek(_opCtx, &peekValue)); ASSERT_TRUE(_mock->peekCalled); ASSERT_BSONOBJ_EQ(values.front(), peekValue); _mock->peekCalled = false; peekValue = OplogBuffer::Value(); OplogBuffer::Value poppedValue; ASSERT_TRUE(_proxy->tryPop(_opCtx, &poppedValue)); ASSERT_TRUE(_mock->tryPopCalled); ASSERT_BSONOBJ_EQ(values.front(), poppedValue); ASSERT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); _mock->tryPopCalled = false; poppedValue = OplogBuffer::Value(); // Peek and pop second value {x: 2}. ASSERT_TRUE(_proxy->peek(_opCtx, &peekValue)); ASSERT_TRUE(_mock->peekCalled); ASSERT_BSONOBJ_EQ(values.back(), peekValue); ASSERT_NOT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); _mock->peekCalled = false; peekValue = OplogBuffer::Value(); ASSERT_TRUE(_proxy->tryPop(_opCtx, &poppedValue)); ASSERT_TRUE(_mock->tryPopCalled); ASSERT_BSONOBJ_EQ(values.back(), poppedValue); ASSERT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); _mock->tryPopCalled = false; poppedValue = OplogBuffer::Value(); // Peek and pop empty buffer. ASSERT_FALSE(_proxy->peek(_opCtx, &peekValue)); ASSERT_TRUE(_mock->peekCalled); ASSERT_TRUE(peekValue.isEmpty()); ASSERT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); ASSERT_FALSE(_proxy->tryPop(_opCtx, &poppedValue)); ASSERT_TRUE(_mock->tryPopCalled); ASSERT_TRUE(poppedValue.isEmpty()); ASSERT_EQUALS(boost::none, _proxy->getLastPeeked_forTest()); } } // namespace