/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/db/namespace_string.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/stdx/mutex.h" #include "mongo/util/queue.h" namespace mongo { namespace repl { class StorageInterface; /** * Oplog buffer backed by a temporary collection. This collection is created in startup() and * removed in shutdown(). The documents will be popped and peeked in timestamp order. */ class OplogBufferCollection : public OplogBuffer { public: /** * Returns default namespace for temporary collection used to hold data in oplog buffer. */ static NamespaceString getDefaultNamespace(); /** * Returns the embedded document in the 'entry' field. */ static BSONObj extractEmbeddedOplogDocument(const BSONObj& orig); /** * Returns a pair of a BSONObj with an '_id' field equal to the 'ts' field of the provided * document and an 'entry' field equal to the provided document, and the timestamp of the * BSONObj. Assumes there is a 'ts' field in the original document. */ static std::pair addIdToDocument(const BSONObj& orig); explicit OplogBufferCollection(StorageInterface* storageInterface); OplogBufferCollection(StorageInterface* storageInterface, const NamespaceString& nss); /** * Returns the namespace string of the collection used by this oplog buffer. */ NamespaceString getNamespace() const; void startup(OperationContext* txn) override; void shutdown(OperationContext* txn) override; void pushEvenIfFull(OperationContext* txn, const Value& value) override; void push(OperationContext* txn, const Value& value) override; /** * Pushing documents with 'pushAllNonBlocking' will not handle sentinel documents properly. If * pushing sentinel documents is required, use 'push' or 'pushEvenIfFull'. */ bool pushAllNonBlocking(OperationContext* txn, Batch::const_iterator begin, Batch::const_iterator end) override; void waitForSpace(OperationContext* txn, std::size_t size) override; bool isEmpty() const override; std::size_t getMaxSize() const override; std::size_t getSize() const override; std::size_t getCount() const override; void clear(OperationContext* txn) override; bool tryPop(OperationContext* txn, Value* value) override; bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override; bool peek(OperationContext* txn, Value* value) override; boost::optional lastObjectPushed(OperationContext* txn) const override; // ---- Testing API ---- std::queue getSentinels_forTest() const; private: /* * Creates a temporary collection with the _nss namespace. */ void _createCollection(OperationContext* txn); /* * Drops the collection with the _nss namespace. */ void _dropCollection(OperationContext* txn); /** * Returns the last oplog entry on the given side of the buffer. If front is true it will * return the oldest entry, otherwise it will return the newest one. If the buffer is empty * or peeking fails this returns false. */ bool _peekOneSide_inlock(OperationContext* txn, Value* value, bool front) const; // Storage interface used to perform storage engine level functions on the collection. StorageInterface* _storageInterface; /** * Pops an entry off the buffer in a lock. */ bool _doPop_inlock(OperationContext* txn, Value* value); // The namespace for the oplog buffer collection. const NamespaceString _nss; // Allows functions to wait until the queue has data. This condition variable is used with // _mutex below. stdx::condition_variable _cvNoLongerEmpty; // Protects member data below and synchronizes it with the underlying collection. mutable stdx::mutex _mutex; // Number of documents in buffer. std::size_t _count; // Size of documents in buffer. std::size_t _size; std::queue _sentinels; Timestamp _lastPushedTimestamp; Timestamp _lastPoppedTimestamp; }; } // namespace repl } // namespace mongo