summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_batcher.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog_batcher.h')
-rw-r--r--src/mongo/db/repl/oplog_batcher.h220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/mongo/db/repl/oplog_batcher.h b/src/mongo/db/repl/oplog_batcher.h
new file mode 100644
index 00000000000..092655911e5
--- /dev/null
+++ b/src/mongo/db/repl/oplog_batcher.h
@@ -0,0 +1,220 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/oplog_buffer.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/invariant.h"
+
+namespace mongo {
+namespace repl {
+
+class OplogApplier;
+
+/**
+ * Stores a batch of oplog entries for oplog application.
+ */
+class OplogBatch {
+public:
+ explicit OplogBatch(std::size_t batchLimitOps) {
+ _batch.reserve(batchLimitOps);
+ }
+ bool empty() const {
+ return _batch.empty();
+ }
+ const OplogEntry& front() const {
+ invariant(!_batch.empty());
+ return _batch.front();
+ }
+ const OplogEntry& back() const {
+ invariant(!_batch.empty());
+ return _batch.back();
+ }
+ const std::vector<OplogEntry>& getBatch() const {
+ return _batch;
+ }
+
+ void emplace_back(OplogEntry oplog) {
+ invariant(!_mustShutdown);
+ _batch.emplace_back(std::move(oplog));
+ }
+ void pop_back() {
+ _batch.pop_back();
+ }
+
+ /**
+ * A batch with this set indicates that the upstream stages of the pipeline are shutdown and
+ * no more batches will be coming.
+ *
+ * This can only happen with empty batches.
+ *
+ * TODO replace the empty object used to signal draining with this.
+ */
+ bool mustShutdown() const {
+ return _mustShutdown;
+ }
+ void setMustShutdownFlag() {
+ invariant(empty());
+ _mustShutdown = true;
+ }
+
+ /**
+ * Passes the term when the buffer is exhausted to a higher level in case the node has stepped
+ * down and then stepped up again. See its caller for more context.
+ */
+ boost::optional<long long> termWhenExhausted() const {
+ return _termWhenExhausted;
+ }
+ void setTermWhenExhausted(long long term) {
+ invariant(empty());
+ _termWhenExhausted = term;
+ }
+
+ /**
+ * Leaves this object in an unspecified state. Only assignment and destruction are valid.
+ */
+ std::vector<OplogEntry> releaseBatch() {
+ return std::move(_batch);
+ }
+
+private:
+ std::vector<OplogEntry> _batch;
+ bool _mustShutdown = false;
+ boost::optional<long long> _termWhenExhausted;
+};
+
+/**
+ * Consumes batches of oplog entries from the OplogBuffer to give to the oplog applier, freeing
+ * up space for more operations to be fetched from a sync source and allocated onto the OplogBuffer.
+ */
+class OplogBatcher {
+ OplogBatcher(const OplogBatcher&) = delete;
+ OplogBatcher& operator=(const OplogBatcher&) = delete;
+
+public:
+ /**
+ * Controls what can popped from the oplog buffer into a single batch of operations that can be
+ * applied using OplogApplier::applyOplogBatch().
+ */
+ class BatchLimits {
+ public:
+ size_t bytes = 0;
+ size_t ops = 0;
+
+ // If provided, the batch will not include any operations with timestamps after this point.
+ // This is intended for implementing slaveDelay, so it should be some number of seconds
+ // before now.
+ boost::optional<Date_t> slaveDelayLatestTimestamp = {};
+
+ // If non-null, the batch will include operations with timestamps either
+ // before-and-including this point or after it, not both.
+ Timestamp forceBatchBoundaryAfter;
+ };
+
+ /**
+ * Constructs an OplogBatcher
+ */
+ OplogBatcher(OplogApplier* oplogApplier, OplogBuffer* oplogBuffer);
+
+ virtual ~OplogBatcher();
+
+ /**
+ * Returns the batch of oplog entries and clears _ops so the batcher can store a new batch.
+ */
+ OplogBatch getNextBatch(Seconds maxWaitTime);
+
+ /**
+ * Starts up a thread to continuously pull from the OplogBuffer into the OplogBatcher's oplog
+ * batch.
+ */
+ void startup(StorageInterface* storageInterface);
+
+ /**
+ * Shuts down the thread that pulls from the OplogBuffer to the oplog batch.
+ */
+ void shutdown();
+
+ /**
+ * Returns a new batch of ops to apply.
+ * A batch may consist of:
+ * at most "BatchLimits::ops" OplogEntries
+ * at most "BatchLimits::bytes" worth of OplogEntries
+ * only OplogEntries from before the "BatchLimits::slaveDelayLatestTimestamp" point
+ * a single command OplogEntry (excluding applyOps, which are grouped with CRUD ops)
+ */
+ StatusWith<std::vector<OplogEntry>> getNextApplierBatch(OperationContext* opCtx,
+ const BatchLimits& batchLimits);
+
+private:
+ /**
+ * If slaveDelay is enabled, this function calculates the most recent timestamp of any oplog
+ * entries that can be be returned in a batch.
+ */
+ boost::optional<Date_t> _calculateSlaveDelayLatestTimestamp();
+
+ /**
+ * Pops the operation at the front of the OplogBuffer.
+ */
+ void _consume(OperationContext* opCtx, OplogBuffer* oplogBuffer);
+
+ void _run(StorageInterface* storageInterface);
+
+ OplogApplier* _oplogApplier;
+ OplogBuffer* const _oplogBuffer;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("OplogBatcher::_mutex");
+ stdx::condition_variable _cv;
+
+ /**
+ * The latest batch of oplog entries ready for the applier.
+ */
+ OplogBatch _ops;
+
+ std::unique_ptr<stdx::thread> _thread;
+};
+
+/**
+ * Returns maximum number of operations in each batch that can be applied using
+ * applyOplogBatch().
+ */
+std::size_t getBatchLimitOplogEntries();
+
+/**
+ * Calculates batch limit size (in bytes) using the maximum capped collection size of the oplog
+ * size.
+ * Batches are limited to 10% of the oplog.
+ */
+std::size_t getBatchLimitOplogBytes(OperationContext* opCtx, StorageInterface* storageInterface);
+
+} // namespace repl
+} // namespace mongo