diff options
Diffstat (limited to 'src/mongo/db/repl/oplog_batcher.h')
-rw-r--r-- | src/mongo/db/repl/oplog_batcher.h | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/src/mongo/db/repl/oplog_batcher.h b/src/mongo/db/repl/oplog_batcher.h new file mode 100644 index 00000000000..092655911e5 --- /dev/null +++ b/src/mongo/db/repl/oplog_batcher.h @@ -0,0 +1,220 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/oplog_buffer.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/invariant.h" + +namespace mongo { +namespace repl { + +class OplogApplier; + +/** + * Stores a batch of oplog entries for oplog application. + */ +class OplogBatch { +public: + explicit OplogBatch(std::size_t batchLimitOps) { + _batch.reserve(batchLimitOps); + } + bool empty() const { + return _batch.empty(); + } + const OplogEntry& front() const { + invariant(!_batch.empty()); + return _batch.front(); + } + const OplogEntry& back() const { + invariant(!_batch.empty()); + return _batch.back(); + } + const std::vector<OplogEntry>& getBatch() const { + return _batch; + } + + void emplace_back(OplogEntry oplog) { + invariant(!_mustShutdown); + _batch.emplace_back(std::move(oplog)); + } + void pop_back() { + _batch.pop_back(); + } + + /** + * A batch with this set indicates that the upstream stages of the pipeline are shutdown and + * no more batches will be coming. + * + * This can only happen with empty batches. + * + * TODO replace the empty object used to signal draining with this. + */ + bool mustShutdown() const { + return _mustShutdown; + } + void setMustShutdownFlag() { + invariant(empty()); + _mustShutdown = true; + } + + /** + * Passes the term when the buffer is exhausted to a higher level in case the node has stepped + * down and then stepped up again. See its caller for more context. + */ + boost::optional<long long> termWhenExhausted() const { + return _termWhenExhausted; + } + void setTermWhenExhausted(long long term) { + invariant(empty()); + _termWhenExhausted = term; + } + + /** + * Leaves this object in an unspecified state. Only assignment and destruction are valid. + */ + std::vector<OplogEntry> releaseBatch() { + return std::move(_batch); + } + +private: + std::vector<OplogEntry> _batch; + bool _mustShutdown = false; + boost::optional<long long> _termWhenExhausted; +}; + +/** + * Consumes batches of oplog entries from the OplogBuffer to give to the oplog applier, freeing + * up space for more operations to be fetched from a sync source and allocated onto the OplogBuffer. + */ +class OplogBatcher { + OplogBatcher(const OplogBatcher&) = delete; + OplogBatcher& operator=(const OplogBatcher&) = delete; + +public: + /** + * Controls what can popped from the oplog buffer into a single batch of operations that can be + * applied using OplogApplier::applyOplogBatch(). + */ + class BatchLimits { + public: + size_t bytes = 0; + size_t ops = 0; + + // If provided, the batch will not include any operations with timestamps after this point. + // This is intended for implementing slaveDelay, so it should be some number of seconds + // before now. + boost::optional<Date_t> slaveDelayLatestTimestamp = {}; + + // If non-null, the batch will include operations with timestamps either + // before-and-including this point or after it, not both. + Timestamp forceBatchBoundaryAfter; + }; + + /** + * Constructs an OplogBatcher + */ + OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer); + + virtual ~OplogBatcher(); + + /** + * Returns the batch of oplog entries and clears _ops so the batcher can store a new batch. + */ + OplogBatch getNextBatch(Seconds maxWaitTime); + + /** + * Starts up a thread to continuously pull from the OplogBuffer into the OplogBatcher's oplog + * batch. + */ + void startup(StorageInterface* storageInterface); + + /** + * Shuts down the thread that pulls from the OplogBuffer to the oplog batch. + */ + void shutdown(); + + /** + * Returns a new batch of ops to apply. + * A batch may consist of: + * at most "BatchLimits::ops" OplogEntries + * at most "BatchLimits::bytes" worth of OplogEntries + * only OplogEntries from before the "BatchLimits::slaveDelayLatestTimestamp" point + * a single command OplogEntry (excluding applyOps, which are grouped with CRUD ops) + */ + StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx, + const BatchLimits& batchLimits); + +private: + /** + * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog + * entries that can be be returned in a batch. + */ + boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp(); + + /** + * Pops the operation at the front of the OplogBuffer. + */ + void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer); + + void _run(StorageInterface* storageInterface); + + OplogApplier* _oplogApplier; + OplogBuffer* const _oplogBuffer; + + Mutex _mutex = MONGO_MAKE_LATCH("OplogBatcher::_mutex"); + stdx::condition_variable _cv; + + /** + * The latest batch of oplog entries ready for the applier. + */ + OplogBatch _ops; + + std::unique_ptr<stdx::thread> _thread; +}; + +/** + * Returns maximum number of operations in each batch that can be applied using + * applyOplogBatch(). + */ +std::size_t getBatchLimitOplogEntries(); + +/** + * Calculates batch limit size (in bytes) using the maximum capped collection size of the oplog + * size. + * Batches are limited to 10% of the oplog. + */ +std::size_t getBatchLimitOplogBytes(OperationContext* opCtx, StorageInterface* storageInterface); + +} // namespace repl +} // namespace mongo |