/** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/stdx/functional.h" #include "mongo/util/concurrency/old_thread_pool.h" namespace mongo { class Database; class OperationContext; namespace repl { class BackgroundSyncInterface; class ReplicationCoordinator; class OpTime; /** * "Normal" replica set syncing */ class SyncTail { public: using MultiSyncApplyFunc = stdx::function& ops, SyncTail* st)>; /** * Type of function to increment "repl.apply.ops" server status metric. */ using IncrementOpsAppliedStatsFn = stdx::function; /** * Type of function that takes a non-command op and applies it locally. * Used for applying from an oplog. * 'db' is the database where the op will be applied. * 'opObj' is a BSONObj describing the op to be applied. * 'convertUpdateToUpsert' indicates to convert some updates to upserts for idempotency reasons. * 'opCounter' is used to update server status metrics. * Returns failure status if the op was an update that could not be applied. */ using ApplyOperationInLockFn = stdx::function; /** * Type of function that takes a command op and applies it locally. * Used for applying from an oplog. * Returns failure status if the op that could not be applied. */ using ApplyCommandInLockFn = stdx::function; SyncTail(BackgroundSyncInterface* q, MultiSyncApplyFunc func); virtual ~SyncTail(); /** * Applies the operation that is in param o. * Functions for applying operations/commands and increment server status counters may * be overridden for testing. */ static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert, ApplyOperationInLockFn applyOperationInLock, ApplyCommandInLockFn applyCommandInLock, IncrementOpsAppliedStatsFn incrementOpsAppliedStats); static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert); void oplogApplication(); bool peek(BSONObj* obj); class OpQueue { public: OpQueue() : _size(0) {} size_t getSize() const { return _size; } const std::deque& getDeque() const { return _deque; } void push_back(OplogEntry&& op) { _size += op.raw.objsize(); _deque.push_back(std::move(op)); } bool empty() const { return _deque.empty(); } const OplogEntry& back() const { invariant(!_deque.empty()); return _deque.back(); } private: std::deque _deque; size_t _size; }; // returns true if we should continue waiting for BSONObjs, false if we should // stop waiting and apply the queue we have. Only returns false if !ops.empty(). bool tryPopAndWaitForMore(OperationContext* txn, OpQueue* ops); /** * Fetch a single document referenced in the operation from the sync source. */ virtual BSONObj getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o); /** * If applyOperation_inlock should be called again after an update fails. */ virtual bool shouldRetry(OperationContext* txn, const BSONObj& o); void setHostname(const std::string& hostname); /** * Returns writer thread pool. * Used by ReplicationCoordinatorExternalStateImpl only. */ OldThreadPool* getWriterPool(); /** * This variable determines the number of writer threads SyncTail will have. It has a default * value, which varies based on architecture and can be overridden using the * "replWriterThreadCount" server parameter. */ static int replWriterThreadCount; protected: // Cap the batches using the limit on journal commits. // This works out to be 100 MB (64 bit) or 50 MB (32 bit) static const unsigned int replBatchLimitBytes = dur::UncommittedBytesLimit; static const int replBatchLimitSeconds = 1; static const unsigned int replBatchLimitOperations = 5000; // Apply a batch of operations, using multiple threads. // Returns the last OpTime applied during the apply batch, ops.end["ts"] basically. OpTime multiApply(OperationContext* txn, const OpQueue& ops); private: class OpQueueBatcher; std::string _hostname; BackgroundSyncInterface* _networkQueue; // Function to use during applyOps MultiSyncApplyFunc _applyFunc; // persistent pool of worker threads for writing ops to the databases OldThreadPool _writerPool; }; /** * Applies the operations described in the oplog entries contained in "ops" using the * "applyOperation" function. * * Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops, * ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary, and the OpTime of the * final operation applied otherwise. * * Shared between here and MultiApplier. */ StatusWith multiApply(OperationContext* txn, OldThreadPool* workerPool, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn applyOperation); // These free functions are used by the thread pool workers to write ops to the db. void multiSyncApply(const std::vector& ops, SyncTail* st); void multiInitialSyncApply(const std::vector& ops, SyncTail* st); } // namespace repl } // namespace mongo